OpenCL 内建管道读/写函数

OpenCL 2.0中引入了一种新的工作机制用于在不同内核间传递数据,这种新的机制就是管道(pipe)。

一个管道实际上就是一个结构化FIFO缓冲,是通过类型修饰符pipe关键字来标识,由数据包(packet)的集合空间构成。这些数据包在管道中是有序放置的,管道只能被内核函数访问,而不能被主机访问。
例如:

pipe int4 pipeA;//int4包的一个管道
pipe user_type_t pipeB;//用户自定包的一个管道

可以使用OpenCL 函数修饰符中的访问修饰符read_only或write_only来限定管道在内核中的读写权限。默认修饰符是read_only。内核不能对同一个管道又读又写,所以管道用read_write修饰符会有编译错误。

管道(即pipe对象)只能作为函数(包含内核函数)参数传入,而不能用在函数内部进行声明或作为程序全局对象使用。

OpenCL 2.0中,增加了管道操作的内建函数,下面我们就分别来讲述这些内建函数。

内建管道读/写函数

下表列出了内建管道读和写函数。在开始讲解这些函数前,我们做一个约定:使用通用类型符gentype来指示这些函数参数可以取OpenCL内建矢量、内建标量、浮点数和用户自定义数据类型。

内建管道读/写函数

下面我们通过一个简单的例子来说明上表中函数的用法。例子中有两个内核函数,分别是pipe_producer和pipe_consumer,pipe_producer向管道中写入数据,而pipe_consumer从管道中读取数据。主机和设备端代码如下:

内核代码:

kernel void pipe_producer(global float *src,
{
}
                              write_only pipe float out_pipe)
    int gid = get_global_id(0);
    reserve_id_t res_id;
    //每个work-item向预留区域管道中读取1个包数据
    res_id = reserve_write_pipe(out_pipe, 1);
    float src_pipe = src[gid];
    if(is_valid_reserve_id(res_id))
    {
        //把src_pipe数据写入管道中
        if(write_pipe(out_pipe, res_id, 0, &src_pipe) != 0)
              return;
        //提交写入操作
        commit_write_pipe(out_pipe, res_id);
    }
}
kernel void pipe_consumer(global float *dst,
{
read_only pipe float in_pipe)
 int gid = get_global_id(0);
 reserve_id_t res_id;
 //每个work-item从预留区域管道中写入1个包数据
 res_id = reserve_read_pipe(in_pipe, 1);
 float dst_pipe;
 if(is_valid_reserve_id(res_id))
 {
   //从管道中读取1个包数据到dst_pipe
   if(read_pipe(in_pipe, res_id, 0, &dst_pipe) != 0)
      return;
  //提交读取操作
  commit_read_pipe(in_pipe, res_id);
 }
 dst[gid] = dst_pipe;
}

主机端代码:

……
int numPackets = 16384;
int packetSize = sizeof(float);
kernel_pipe_consumer = clCreateKernel(program, "pipe_consumer",
&err);
checkErr(err, _LINE_) ;
kernel_pipe_producer = clCreateKernel(program, "pipe_producer",
                                              &err);
checkErr(err, _LINE_) ;
//细粒度SVM
src = (float *)clSVMAlloc(context, CL_MEM_READ_WRITE |
                              CL_MEM_SVM_FINE_GRAIN_BUFFER, sizeBytes,
                              0);
dst = (float *)clSVMAlloc(context, CL_MEM_READ_WRITE |
                              CL_MEM_SVM_FINE_GRAIN_BUFFER, sizeBytes,
                              0);
if(src == NULL || dst == NULL)
{
    printf("allocate SVM error!\n");
    return 0;
}
//创建管道,包数据类型为float,包个数为numPackets个
pipe = clCreatePipe(context, CL_MEM_HOST_NO_ACCESS, packetSize,
                        numPackets, NULL, &err);
checkErr(err, _LINE_);
for (int i = 0; i 〈 numPackets; i++)
{
    src[i] = i;
    dst[i] = 0.0f;
}
size_t globalworksize = numPackets;
size_t localworksize = 128;
//producer
err = clSetKernelArgSVMPointer(kernel_pipe_producer, 0, src);
checkErr(err, _LINE_);
err = clSetKernelArg(kernel_pipe_producer, 1, sizeof(cl_mem),
                        &pipe);
checkErr(err, _LINE_);
err = clEnqueueNDRangeKernel(cmdQueue0, kernel_pipe_producer, 1,
                                  NULL,
                                  &globalworksize, &localworksize, 0,
                                  NULL, &producer_event);
checkErr(err, _LINE_);
//consumer
err = clSetKernelArgSVMPointer(kernel_pipe_consumer, 0, dst);
checkErr(err, _LINE_);
err = clSetKernelArg(kernel_pipe_consumer, 1, sizeof(cl_mem),
                        &pipe);
checkErr(err, _LINE_);
err = clEnqueueNDRangeKernel(cmdQueue1, kernel_pipe_consumer, 1,
                                  NULL,
&globalworksize, &localworksize, 1,
&producer_event, NULL);
checkErr(err, _LINE_);
err = clFinish(cmdQueue1);
checkErr(err, _LINE_);

上述代码使用管道实现了生产-消费模型,管道作为参数传入pipe_producer内核,内核中产生数据写入管道中;在pipe_consumer内核中,从管道读取pipe_producer内核中生成的写入的数据。在pipe_producer中每个工作项向管道中写入1个包数据,需要注意的是写入管道中的顺序是乱序的。OpenCL执行模型中,工作组的执行是相互独立的,我们无法控制全局工作项间读取/写入管道的顺序,但是我们可以在工作组内控制读取/写入次序,具体内核代码如下:

kernel void pipe_producer(global float *src,
{
}
                              write_only pipe float out_pipe)
    int gid = get_global_id(0);
    local reserve_id_t res_id;
    if(get_local_id(0) == 0)
        //预留区域管道写入get_local_size(0)个包
        res_id = reserve_write_pipe(out_pipe, get_local_size(0));
    barrier(CLK_LOCAL_MEM_FENCE);
    float src_pipe = src[gid] ;
    if(is_valid_reserve_id(res_id))
    {
        //根据工作项工作组内索引id写入管道对应位置
        if(write_pipe(out_pipe, res_id, get_local_id(0), &src_pipe)
            != 0)
              return;
        commit_write_pipe(out_pipe, res_id);
    }
kernel void pipe_consumer(global float *dst,
{
                              read_only pipe float in_pipe)
    int gid = get_global_id(0);
    local reserve_id_t res_id;
    if(get_local_id(0) == 0)
        //预留区域管道读取get_local_size(0)个包
        res_id = reserve_read_pipe(in_pipe, get_local_size(0));
    barrier(CLK_LOCAL_MEM_FENCE);
    float dst_pipe;
    if(is_valid_reserve_id(res_id))
    {
      //根据工作项工作组内索引id读取管道对应位置
      if(read_pipe(in_pipe, res_id, get_local_id(0), &dst_pipe) != 0)
        return;
        commit_read_pipe(in_pipe, res_id);
}
dst[gid] = dst_pipe;
}

在上述代码中,我们使用局部存储器声明预留区域ID res_id,在每个工作组中的第一个工作项确定预留区域写入get_local_size(0)个包,在写入和读取管道时根据工作项在工作组内的索引来确定管道对应位置,从而达到了对工作组内管道数据的顺序写入和读取。
除了使用局部存储器,我们也可以使用工作组管道读/写函数来实现工作组内的顺序操作。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程