工作组之间的同步对于OpenCL而言是一个相对比较棘手的问题。正如之前已经提到,由于大部分主流计算设备都不支持原生的工作组之间的同步特性,所以OpenCL标准本身不会对工作组间的同步有任何相关功能上的定义。倘若有某个计算设备能支持工作组之间的同步,那也得自己提供私有内建函数来支持它。下面提供三种通常用于工作组之间做数据同步的手段。
第一种方式,通过原子操作对全局存储器进行原子修改。其实,这种方法我们在上面对一个数组所有元素求和以及求积的例子中已经展示了。如果工作组不多,这种方法会显得更方便,更高效。但是,如果工作组的数量一旦很多,那么这种方式可能就会对性能有所影响了。毕竟,一个原子操作会对存储器总线做锁步操作,再加上对全局存储器的读写本来就是一个非常缓慢的操作,这个过程中不会有、也不可能有任何数据Cache的帮忙,所以开销是巨大的。
第二种方式,我们不妨把需要综合(synthesize)的结果输出到主机端,让主机端处理器对剩余结果进行处理。就拿对数组所有元素求和来说,如果我们一共有8192个元素,而计算设备的工作组最大能含有256个工作项,那么最后只要输出8192/256=32个数据元素。对32个数据元素求和在主机端上而言就如同嚼豆腐一样容易。但如果数组元素更多一些,而且计算再复杂一些呢?由于CPU端上的线性计算与GPGPU等加速设备比起来仍然略逊一筹,所以我们也得不到更好的性能提升。
第三种方式,这个时候我们还是将每个工作组计算得到的数据输出到全局存储器中,等这些数据传完之后,我们在内核程序中再对这些数据做进一步处理。而由于工作组之间本身没有任何现成的同步机制,因此,我们这里需要借助互斥体来进行同步。大致的思路是,为每一个工作组分配一个互斥体作为锁。当一个工作组大小的元素个数都写进全局存储器之后,那么相应的工作组就可以开始计算了。我们用相应于工作组中最后一个工作项的工作组ID来解锁,而用工作组的第一个工作项进行上锁。每次操作完,工作组的个数就变为原来的工作组中最大能包含的工作项的个数(如果最大工作组尺寸为256,就是1/256)。然后,我们用这种方式不断缩减所要计算的元素个数,这个过程称为缩减(reduction)。然后,等我们要处理的元素个数缩减到少于一个工作组最大能包含的工作项的个数之后,我们就把这些元素直接传给CPU端来执行了。
下面我们综合第二种和第三种方式来给出最后一个代码示例。该示例仍然是对一个数组计算其所有元素的。不过所采用的方法如第三种同步方案所介绍的那样,先对每个工作组中的元素进行求和,然后写到相应的全局存储器中暂存,等最大工作组个数的元素都处理完之后就能进入到下一次迭代。这样,在一次迭代中我们就能消耗最大工作组元素个数的平方个元素。而迭代次数仅需要logmax-work-group-size(元素个数总数)。作为最后一个代码示例,我们将给出完整的代码,而不仅仅是代码片段。以下是主机端的代码:
#ifdef _APPLE_
#include 〈OpenCL/opencl.h>
#else
#include 〈CL/cl.h>
#endif
#include 〈stdio.h>
#include 〈string.h>
#include 〈stdlib.h>
#include 〈sys/time.h>
int main(void)
{
cl_int ret;
cl_platform_id platform_id = NULL;
cl_device_id device_id = NULL;
cl_context context = NULL;
cl_command_queue command_queue = NULL;
cl_mem src1MemObj = NULL;
cl_mem src2MemObj = NULL;
cl_mem dstMemObj = NULL;
char *kernelSource = NULL;
cl_program program = NULL;
cl_kernel kernel = NULL;
cl_kernel kernel2 = NULL;
int *pHostBuffer = NULL;
int *pDeviceBuffer = NULL;
//获得OpenCL平台
clGetPlatformIDs(1, &platform_id, NULL);
if(platform_id == NULL)
{
puts("Get OpenCL platform failed!");
goto FINISH;
}
//获得OpenCL计算设备,这里使用GPU类型的计算设备
clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_GPU, 1, &device_id,
NULL);
if(device_id == NULL)
{
puts("No GPU available as a compute device!");
goto FINISH;
}
//根据设备ID来创建上下文
context = clCreateContext(NULL, 1, &device_id, NULL, NULL,
&ret);
if(context == NULL)
{
puts("Context not established!");
goto FINISH;
}
//根据上下文与设备ID来创建命令队列
command_queue = clCreateCommandQueue(context, device_id, 0,
&ret);
if(command_queue == NULL)
{
puts("Command queue cannot be created!");
goto FINISH;
}
//我们分配64MB的存储空间,将主机端的数据传递到设备端
const size_t contentLength = sizeof(int) * 8 * 1024 * 1024;
src1MemObj = clCreateBuffer(context, CL_MEM_READ_ONLY,
contentLength, NULL, &ret);
if(src1MemObj == NULL)
{
puts("Source1 memory object failed to create!");
goto FINISH;
}
src2MemObj = clCreateBuffer(context, CL_MEM_READ_WRITE,
contentLength, NULL, &ret);
if(src2MemObj == NULL)
{
puts("Source2 memory object failed to create!");
goto FINISH;
}
//在主机端分配并初始化64MB的缓存数据
pHostBuffer = malloc(contentLength);
for(int i = 0; i 〈 contentLength / sizeof(int); i++)
pHostBuffer[i] = i + 1;
//对src1MemObj的数据传输,我们使用非阻塞方式,
//等后续设置完成后通过事件等待机制进行同步
ret = clEnqueueWriteBuffer(command_queue, src1MemObj, CL_TRUE,
0,contentLength, pHostBuffer, 0,
NULL, NULL);
if(ret != CL_SUCCESS)
{
puts("Data1 transfer failed");
goto FINISH;
}
//指定内核源文件路径,这个路径根据读者当前环境可以更改
//这里使用绝对路径也是避免不同系统需要调用不同API来获取当前路径
const char *pFileName = "/Users/zennychen/Downloads/test.cl";
FILE *fp = fopen(pFileName, "r");
if (fp == NULL)
{
puts("The specified kernel source file cannot be opened!");
goto FINISH;
}
fseek(fp, 0, SEEK_END);
const long kernelLength = ftell(fp);
fseek(fp, 0, SEEK_SET);
kernelSource = malloc(kernelLength);
fread(kernelSource, 1, kernelLength, fp);
fclose(fp);
program = clCreateProgramWithSource(context, 1,
(const char**)&kernelSource,
(const size_t *)&kernelLength,
&ret);
//获取最大工作组大小
size_t maxWorkGroupSize = 0;
clGetDeviceInfo(device_id, CL_DEVICE_MAX_WORK_GROUP_SIZE,
sizeof(maxWorkGroupSize), &maxWorkGroupSize,
NULL);
//在编译选项中定义一个名为GROUP_NUMBER_OF_WORKITEMS的宏,
//用于指定每个工作组一共有多少工作项
sprintf(kernelSource, "-D GROUP_NUMBER_OF_WORKITEMS=%zu",
maxWorkGroupSize);
ret = clBuildProgram(program, 1, &device_id, kernelSource, NULL,
NULL);
if (ret != CL_SUCCESS)
{
size_t len;
char buffer[8 * 1024];
printf("Error: Failed to build program executable!\n");
clGetProgramBuildInfo(program, device_id,
CL_PROGRAM_BUILD_LOG,
sizeof(buffer), buffer, &len);
printf("%s\n", buffer);
goto FINISH;
}
//kernelSource后面不再使用,这里可以立即对它释放
free(kernelSource);
kernelSource = NULL;
//创建内核函数
kernel = clCreateKernel(program, "kernel_test", &ret);
if(kernel == NULL)
{
puts("Kernel failed to create!");
goto FINISH;
}
//创建用于存放锁以及作为输出剩余元素个数的缓存
//由于第一次迭代中锁的个数为总workgroup的个数除以maxWorkGroupSize
src2MemObj = clCreateBuffer(context, CL_MEM_READ_WRITE,
contentLength / maxWorkGroupSize / maxWorkGroupSize,
NULL, &ret);
if(ret != CL_SUCCESS)
{
puts("Lock buffer creating failed!");
goto FINISH;
}
//创建用于结果输出的缓存对象
//用户输出结果的缓存最多只有maxWorkGroupSize * maxWorkGroupSize
//个元素
dstMemObj = clCreateBuffer(context, CL_MEM_READ_WRITE,
maxWorkGroupSize * maxWorkGroupSize * sizeof(int),
NULL, &ret);
if(ret != CL_SUCCESS)
{
puts("Destination buffer creating failed!");
goto FINISH;
}
ret = clSetKernelArg(kernel, 0, sizeof(cl_mem),
(void *)&dstMemObj);
ret |= clSetKernelArg(kernel, 1, sizeof(cl_mem),
(void *)&src1MemObj);
ret |= clSetKernelArg(kernel, 2, sizeof(cl_mem),
(void *)&src2MemObj);
if(ret != CL_SUCCESS)
{
puts("Set arguments error!");
goto FINISH;
}
//将内核执行命令排入命令队列
ret = clEnqueueNDRangeKernel(command_queue, kernel, 1, NULL,
(const size_t[])
{contentLength / sizeof(int)},
(const size_t[]){maxWorkGroupSize},
0,NULL, NULL);
if(ret != CL_SUCCESS)
{
puts("kernel1 execution failed");
goto FINISH;
}
//这里用clFinish做命令执行同步
clFinish(command_queue);
//准备做校验
pDeviceBuffer = malloc(contentLength);
//这里使用阻塞的方式读取数据
int remainCount;
//先获取剩余的数组元素的个数
clEnqueueReadBuffer(command_queue, src2MemObj, CL_TRUE, 0,
sizeof(int),
&remainCount, 0, NULL, NULL);
//然后获取相关剩余的元素
clEnqueueReadBuffer(command_queue, dstMemObj, CL_TRUE, 0,
remainCount * sizeof(int),
pDeviceBuffer, 0, NULL, NULL);
//做数据校验
int deviceSum = 0;
for(int i = 0; i 〈 remainCount; i++)
deviceSum += pDeviceBuffer[i];
int hostSum = 0;
for(int i = 0; i 〈 contentLength / sizeof(int); i++)
hostSum += pHostBuffer[i];
puts(deviceSum == hostSum ? "OK" : "NG");
FINISH:
if(pHostBuffer != NULL)
free(pHostBuffer);
if(pDeviceBuffer != NULL)
free(pDeviceBuffer);
if(kernelSource != NULL)
free(kernelSource);
if(src1MemObj != NULL)
clReleaseMemObject(src1MemObj);
if(src2MemObj != NULL)
clReleaseMemObject(src2MemObj);
if(dstMemObj != NULL)
clReleaseMemObject(dstMemObj);
if(kernel != NULL)
clReleaseKernel(kernel);
if(kernel2 != NULL)
clReleaseKernel(kernel2);
if(program != NULL)
clReleaseProgram(program);
if(command_queue != NULL)
clReleaseCommandQueue(command_queue);
if(context != NULL)
clReleaseContext(context);
puts("Program complete");
return 0;
}
上述代码中几乎每条关键语句都有相关注释,这里不追加讲解。下面给出完整的OpenCL内核代码:
//我们将第二个源修改为read-write属性,然后将它作为存放锁的全局存储空间
__kernel void kernel_test(__global int *pDst,
{
__global int *pSrc1, __global int *pLocks)
local int tmpBuffer[GROUP_NUMBER_OF_WORKITEMS];
const int index = get_local_id(0);
const int group_index = get_group_id(0);
int group_count = get_num_groups(0);
int address_offset = group_index * GROUP_NUMBER_OF_WORKITEMS +
index;
//第一次迭代从pSrc1获取数据
__global int *pData = pSrc1;
while(group_count >= GROUP_NUMBER_OF_WORKITEMS
&& group_index 〈 group_count)
{
if(index == 0)
{
//这里使用按位或来设置锁,由于不管哪个整数,
//与1相或之后都不会为0,这里用非0表示上锁状态;0表示解锁状态
atomic_or(&pLocks[group_index], 1);
}
//对当前工作组中的所有工作项先暂存局部存储器
tmpBuffer[index] = pData[address_offset];
barrier(CLK_LOCAL_MEM_FENCE);
if(index == 0)
{
//用第一个工作项做求和计算
int sum = 0;
for(int i = 0; i 〈 GROUP_NUMBER_OF_WORKITEMS; i++)
sum += tmpBuffer[i];
//输出到对应工作组索引的全局输出存储器
pDst[group_index] = sum;
//这里假定GROUP_NUMBER_OF_WORKITEMS是2的n次幂
//由于绝大多数计算设备都能满足这个条件
if((group_index & (GROUP_NUMBER_OF_WORKITEMS - 1)) ==
GROUP_NUMBER_OF_WORKITEMS - 1)
{
//让工作组索引对应于一个工作组
//最后一个工作项的工作项进行解锁
atomic_xchg(&pLocks[group_index /
GROUP_NUMBER_OF_WORKITEMS]
, 0);
}
}
//后续要处理的工作组个数需要除以最大工作组大小
//上述已经处理了GROUP_NUMBER_OF_WORKITEMS个工作组的元素个数,
//即一共GROUP_NUMBER_OF_WORKITEMS *GROUP_NUMBER_OF_WORKITEMS
//个元素
group_count /= GROUP_NUMBER_OF_WORKITEMS;
if(index == 0 && group_index 〈 group_count)
{
//对于剩余的工作项,在每一个工作组中用第一个工作项等待锁被打开
int lock;
do
{
//对于OpenCL 2.0以下的版本,
//我们可以使用原子加法来模拟OpenCL 2.0中的atomic_load
lock = atomic_add(&pLocks[group_index], 0);
}
while(lock != 0);
}
//从第二次迭代开始,从pDst获取数据
pData = pDst;
}
//将pLock作为剩余要被主机端处理的元素个数输出
if(get_global_id(0) == 0)
pLocks[0] = group_count * GROUP_NUMBER_OF_WORKITEMS;
}
上述的内核代码对于数组求和算法而言尽管不能算是最高效的,但是作为对工作组之间的同步策略而言已经具有代表性。
最后想提醒各位读者的是,一个计算设备上同时并发执行工作组的个数是有限的。这意味着如果当前所有活跃着的工作组对某个或某些非活跃的工作组产生依赖等待,那么内核程序会被挂死。因为一个活跃的工作组需要被隐退(也就是完全执行完)之后才能把非活跃的工作组调度进来执行。比如以下例子:
我们先在主机端增加一个当前计算设备有多少个CU,用宏来定义,然后传给内核程序。
//获取最大工作组大小
size_t maxWorkGroupSize = 0;
clGetDeviceInfo(device_id, CL_DEVICE_MAX_WORK_GROUP_SIZE,
sizeof(maxWorkGroupSize), &maxWorkGroupSize, NULL);
cl_uint maxComputeUnits = 0;
clGetDeviceInfo(device_id, CL_DEVICE_MAX_COMPUTE_UNITS,
sizeof(maxComputeUnits), &maxComputeUnits, NULL);
//在编译选项中定义一个名为GROUP_NUMBER_OF_WORKITEMS的宏,
//用于指定每个工作组一共有多少工作项
sprintf(kernelSource,
"-D GROUP_NUMBER_OF_WORKITEMS=%zu -D MAX_COMPUTE_UNITS=%u",
maxWorkGroupSize, maxComputeUnits);
以下是内核程序:
kernel void kernel_test(global volatile int *pDst,
global int *pSrc1, global int *pSrc2)
{
const int gid = get_group_id(0);
const int index = get_local_id(0);
if(index == 0)
{
if(gid 〈 MAX_COMPUTE_UNITS * 3 / 2)
while(pDst[1] == 0);
if(gid == 10000)
pDst[1] = 1;
atomic_add(pDst, 1);
}
}
在Intel HD Graphics 5000下,如果是CU个数的3/2情况下,仍然可以正常运行完,因为大部分计算设备的每个CU在某些情况下可以调度几个工作组进行执行,如果所需的执行资源足够。而如果把上述代码中的3/2改成2,那么程序就挂了。内核程序会执行超时。
所以我们在牵扯到工作组之间的同步时需要务必当心,不能同时让过多的工作组依赖某一个或某一些工作组,尤其是相互依赖的工作组之间索引跨度很大的场合一定要避免。