I am tiring to achieve a simple operation using the CUDA. I need to reduce 1D vector by 1D mask. So for example for a data array [8,9,1,9,6] and a mask [0,1,1,0,1] I should get a result [9,1,6]. I tried to create a parallel CUDA function to handle this task. Nevertheless, I was not successful.
In order to make the parallel process for the CUDA, I decided to split the input vector to sectors with similar sizes inside a function ReduceVectorByMask_D_G. Each sector is processed independently by the separated thread. Each thread has one counter (coutersInput_G) to index input array and one counter (countersOutput_G) to index output array. As the thread browse the mask, values where mask is non-zero are copied to the output array (which also leads to the increment of the output counter). In order to increment the counters safely, I use a function atomicAdd. Nevertheless, the counters are not incremented correctly and the values remain the same in each run of the thread. It looks like the original values of the counters are somehow cached and the CUDA does not assume the counters could be changed during individual runs of the function ReduceVectorByMask_D_G_G. Once the execution of the function ReduceVectorByMask_D_G_G ends, individual counters end up with correct values.
#include "cuda_runtime.h"
#include "device_launch_parameters.h"
#include <stdio.h>
#include <math.h>
#define THREADS 5//Use only 5 threads for simplicity.
#define SETVECTORVALUE_G_G int i = blockDim.x * blockIdx.x + threadIdx.x;if (i < length){vector_G[i] = value;}
#define CUDAVALLOC1(typ) typ* output_G=NULL;\
cudaMalloc((void**)&output_G, length * sizeof(typ));\
if (output_G == NULL)\
{\
return NULL;\
}\
int gridSize = (int)ceil(((double)length / (double)THREADS))
/// <summary>
/// Set all values in vector to specific int value.
/// </summary>
/// <param name="vector_G">Vector to be edited.</param>
/// <param name="length">Number of elements.</param>
/// <param name="value">The value to be set.</param>
__global__ void SetVectorValue_I32_G_G(int* vector_G, size_t length, int value)
{
SETVECTORVALUE_G_G;//Set values for individual elements.
}
/// <summary>
/// Allocate array in GPU memory and fill with given value.
/// </summary>
/// <param name="length">Number of elements.</param>
/// <param name="value">Value to be used for filling.</param>
/// <returns>Returns address of the new array.</returns>
int* CudaValloc_I32_G(size_t length, int value)
{
CUDAVALLOC1(int);//Initialize arrays using type int.
SetVectorValue_I32_G_G << <gridSize, THREADS >> > (output_G, length, value);//Set individual values of the output array to specific value in GPU.
cudaDeviceSynchronize();//Wait for the process to finish.
return output_G;
}
/// <summary>
/// Set all values in vector to specific double value.
/// </summary>
/// <param name="vector_G">Vector to be edited.</param>
/// <param name="length">Number of elements.</param>
/// <param name="value">The value to be set.</param>
__global__ void SetVectorValue_D_G_G(double* vector_G, size_t length, double value)
{
SETVECTORVALUE_G_G;//Set values for individual elements.
}
/// <summary>
/// Allocate array in GPU memory and fill with given value.
/// </summary>
/// <param name="length">Number of elements.</param>
/// <param name="value">Value to be used for filling.</param>
/// <returns>Returns address of the new array.</returns>
double* CudaValloc_D_G(size_t length, double value)
{
CUDAVALLOC1(double);//Initialize arrays using type double.
SetVectorValue_D_G_G << <gridSize, THREADS >> > (output_G, length, value);//Set individual values of the output array to specific value in GPU.
cudaDeviceSynchronize();//Wait for the process to finish.
return output_G;
}
/// <summary>
/// Sum vector elements in individual threads. Each thread has its own accumulator.
/// </summary>
/// <param name="vector_G">Vector to be summed to individual counters.</param>
/// <param name="length">Length of the vector_G.</param>
/// <param name="counters_G">Individual accumulators for individual threads should be zeroed before function call. The length is the number of threads.</param>
__global__ void SumVectorValues_D_I32_G(int* vector_G, size_t length, int* counters_G)
{
int i = blockDim.x * blockIdx.x + threadIdx.x;//OVerall index in the input.
if (i < length)
{
atomicAdd((int*)(counters_G + threadIdx.x), vector_G[i]);//Safe add.
}
}
/// <summary>
/// Summation of vector values using cuda.
/// </summary>
/// <param name="vector_G">Vector of integers to be summed.</param>
/// <param name="length">Length of the vector.</param>
/// <returns>Sum of the input vector.</returns>
int SumVectorValues_I32_G(int* vector_G, size_t length)
{
int output = 0;//The sum to be returned.
int gridSize = (int)ceil(((double)length / (double)THREADS));//Determine how many times the threads need to be run to cover whole array.
int* counters_G = CudaValloc_I32_G(THREADS, 0);//Create counter for each thread.
int* counters = (int*)malloc(THREADS * sizeof(int));//Prepare space for counters in the RAM.
SumVectorValues_D_I32_G << <gridSize, THREADS >> > (vector_G, length, counters_G);//Run individual threads.
cudaDeviceSynchronize();//Wait for the process to finish.
cudaMemcpy(counters, counters_G, THREADS * sizeof(int), cudaMemcpyDeviceToHost);//Move counters to RAM.
for (size_t i = 0; i < THREADS; i++)//Sum individual counters to one value.
{
output += counters[i];
}
cudaFree(counters_G);
free(counters);
return output;
}
/// <summary>
/// Process independently individual sectors of input to output.
/// </summary>
/// <param name="vector_G">The whole input data vector to be processed.</param>
/// <param name="mask_G">The whole input data vector to be used.</param>
/// <param name="inputSectorStarts_G">Relative addresses of sector starts in input.</param>
/// <param name="inputSectorSizes_G">Lengths of individual input sectors.</param>
/// <param name="outputSectorStarts_G">Relative addresses of sector starts in output.</param>
/// <param name="coutersInput_G">Zeroed counters to address input.</param>
/// <param name="countersOutput_G">Zeroed counters to address output.</param>
/// <param name="output_G">Array to strore output values.</param>
__global__ void ReduceVectorByMask_D_G_G(double* vector_G, int* mask_G, size_t* inputSectorStarts_G, size_t* inputSectorSizes_G, size_t* outputSectorStarts_G, int* coutersInput_G, int* countersOutput_G, double* output_G)
{
if (coutersInput_G[threadIdx.x] < inputSectorSizes_G[threadIdx.x])//Check counter to be in the current sector borders.
{//Current thread is still in its sector borders.
if (mask_G[inputSectorStarts_G[threadIdx.x] + coutersInput_G[threadIdx.x]])//Check mask value.
{//Mask is true the value copy will be done.
output_G[outputSectorStarts_G[threadIdx.x] + countersOutput_G[threadIdx.x]] = vector_G[inputSectorStarts_G[threadIdx.x] + coutersInput_G[threadIdx.x]];//Copy value from input array to the output array.
atomicAdd(countersOutput_G + ((int)threadIdx.x), 1);//Increment output counter to point to the next empty address.
}
atomicAdd(coutersInput_G + ((int)threadIdx.x), 1);//Increment input counter to point to the next value to be checked.
}
}
/// <summary>
/// Removes data points where mask is zero.
/// </summary>
/// <param name="vector_G">Input data to be reduced.</param>
/// <param name="mask_G">Mask to used for reduction. Contains values 1 or 0.</param>
/// <param name="vectorCount">Number of elements in the mask and in the input vector.</param>
/// <param name="countOfValidElements">Count of the non-zero elements in the mask.</param>
/// <returns>Reduced data vector in GPU memory.</returns>
double* ReduceVectorByMask_D_G(double* vector_G, int* mask_G, size_t vectorCount, size_t* countOfValidElements)
{
int gridSize = (int)ceil(((double)vectorCount / (double)THREADS));//How many times the threads need to be run to browse whole input array.
size_t* inputSizes = (size_t*)calloc(THREADS, sizeof(size_t));//Sizes of individual input blocks for individual threads.
size_t* inputSectorSizes_G = NULL;//Sector sizes of the input vector for individual threads.
size_t* outputSectorSizes = (size_t*)calloc(THREADS, sizeof(size_t));//Sector sizes of the output vector for individual threads.
size_t* outputSectorStarts = (size_t*)calloc(THREADS, sizeof(size_t));//Where in the output array individual sectors start.
size_t* inputSectorStarts = (size_t*)calloc(THREADS, sizeof(size_t));//Where in the input array individual sectors start.
size_t* inputSectorStarts_G = NULL;//Clone of the inputSectorStarts stored in the GPU.
size_t* outputSectorStarts_G = NULL;//Clone of the outputSectorStarts stored in the GPU.
int* countersInput_G = CudaValloc_I32_G(THREADS, 0);//Initialize and zero counters for incremental addressing of the input vector.
int* countersOutput_G = CudaValloc_I32_G(THREADS, 0);//Initialize and zero counters for incremental addressing of the output vector.
double* output_G = NULL;//Output vector.
cudaMalloc((void**)&inputSectorSizes_G, THREADS * sizeof(size_t));//Allocate GPU working arrays.
cudaMalloc((void**)&inputSectorStarts_G, THREADS * sizeof(size_t));//Allocate GPU working arrays.
cudaMalloc((void**)&outputSectorStarts_G, THREADS * sizeof(size_t));//Allocate GPU working arrays.
for (size_t i = 0; i < THREADS - 1; i++)//Browse threads and split input vector equidistantly.
{
inputSizes[i] = vectorCount / THREADS;//Division and flooring.
inputSectorStarts[i + 1] = inputSectorStarts[i] + inputSizes[i];//Address of the start of the new sector is calculated as the address of the start of the previous sector plus size of previous sector.
}
inputSizes[THREADS - 1] = vectorCount - vectorCount / THREADS * (THREADS - 1);//Last sector contains remaining elements as the number of input elements in not dividable by threadNumber.
cudaMemcpy(inputSectorStarts_G, inputSectorStarts, THREADS * sizeof(size_t), cudaMemcpyHostToDevice);//Move sector starts addresses to the GPU.
int* mask_G_temp = mask_G;//Prepare incremental pointer to the input mask.
countOfValidElements[0] = 0;//Zero overall counter.
for (size_t i = 0; i < THREADS; i++)//Browse individual sectors in mask to find how large sectors need to be in the output.
{
outputSectorSizes[i] = (size_t)SumVectorValues_I32_G(mask_G_temp, inputSizes[i]);//Get number of non-zero mask elements.
countOfValidElements[0] += outputSectorSizes[i];//Add to the overal output size.
mask_G_temp += inputSizes[i];//Move to next sector.
}
output_G = CudaValloc_D_G(countOfValidElements[0], 0);//Allocate output vector with zeros using precalculated number of non-zero elements.
for (size_t i = 0; i < THREADS - 1; i++)//Calculate sectors starts in the output.
{
outputSectorStarts[i + 1] = outputSectorStarts[i] + outputSectorSizes[i];//Current sector start is the previous sector start address plus previous sector size.
}
cudaMemcpy(outputSectorStarts_G, outputSectorStarts, THREADS * sizeof(size_t), cudaMemcpyHostToDevice);//Move data to GPU to be accepted by __global__ functions.
cudaMemcpy(inputSectorSizes_G, inputSizes, THREADS * sizeof(size_t), cudaMemcpyHostToDevice);//Move data to GPU to be accepted by __global__ functions.
ReduceVectorByMask_D_G_G << <gridSize, THREADS >> > (vector_G, mask_G, inputSectorStarts_G, inputSectorSizes_G, outputSectorStarts_G, countersInput_G, countersOutput_G, output_G);//Run kernel function
cudaDeviceSynchronize();//Wait for result
cudaFree(countersInput_G);//Clear data from the GPU.
cudaFree(countersOutput_G);//Clear data from the GPU.
cudaFree(inputSectorSizes_G); //Clear data from the GPU.
cudaFree(inputSectorStarts_G);//Clear data from the GPU.
cudaFree(outputSectorStarts_G);//Clear data from the GPU.
free(inputSizes);//Clear data from the RAM.
free(outputSectorSizes);//Clear data from the RAM.
free(outputSectorStarts);//Clear data from the RAM.
free(inputSectorStarts);//Clear data from the RAM.
return output_G;//Return reduced vector in GPU.
}
/// <summary>
/// Generates testing data and test the ReduceVectorByMask_D_G function
/// </summary>
/// <returns></returns>
int main()
{
double vector[100] = {8,9,1,9,6,0,2,5,9,9,1,9,9,4,8,1,4,9,7,9,6,0,8,9,6,7,7,3,6,1,7,0,2,0,0,8,6,3,9,0,4,3,7,7,1,4,4,6,7,7,2,6,6,1,1,4,9,3,5,2,7,2,5,6,8,9,5,1,1,2,8,2,8,2,9,3,1,2,6,4,3,8,5,5,9,2,7,7,3,5,0,0,5,7,9,1,5,4,0,3 };//Create random array to re reduced.
int mask[100] = {0,1,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1,1,0,0,1,1,1,1,0,1,1,1,1,1,1,1,0,1,1,0,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,1,1,1 };//Create mask.
double* vector_G = NULL;//Data in GPU.
int* mask_G = NULL;//Mask in GPU.
double* reducedVector_G = NULL;//Variable to store reduced data vector.
double* reducedVector = (double*)malloc(100 * sizeof(double));//Variable to store reduced data vector in RAM.
size_t countOfValidElements = 0;//Number of non-zero elements in the mask counted by the function as by-product.
cudaMalloc((void**)&vector_G, 100 * sizeof(double));//Allocate memory for the data.
cudaMalloc((void**)&mask_G, 100 * sizeof(int));//Allocate memory for the mask.
cudaMemcpy(vector_G, vector, 100 * sizeof(double), cudaMemcpyHostToDevice);//Copy data to the GPU.
cudaMemcpy(mask_G, mask, 100 * sizeof(int), cudaMemcpyHostToDevice);//Copy mask to the GPU.
reducedVector_G = ReduceVectorByMask_D_G(vector_G, mask_G, 100, &countOfValidElements);//Reduce vector by mask in GPU and get overall number of elements in output.
cudaMemcpy(reducedVector, reducedVector_G, countOfValidElements * sizeof(double), cudaMemcpyDeviceToHost);//Copy result to the RAM.
for (size_t i = 0; i < countOfValidElements; i++)//Write results to the console.
{
printf("%d,", (int)reducedVector[i]);//Print element separated.
}
cudaFree(vector_G);//Clear data from memory.
cudaFree(mask_G);//Clear data from memory.
cudaFree(reducedVector_G);//Clear data from memory.
free(reducedVector);//Clear data from memory.
}
Is there any simple way to fix my code in the function ReduceVectorByMask_D_G_G, or is there any other method to get the array reduced by mask in CUDA?