One possible approach would be to use a ring buffer, atomically updated. I don't know if this is "more elegant" than whatever you are currently doing. I don't know what you mean by __device__ printf
overload. If you are still using the CUDA printf-from-kernel facility (probably not) then that has a variety of limits that prevent arbitrary asynchronous usage as you seem to be asking for.
Also, based on the comments, you seem to want some sort of method to wake a sleeping host thread (I guess? I'm not sure what "how do I wake up the host when data is ready?" means.) I don't know of a way to wake a sleeping host thread based on CUDA activity, e.g. a changing memory location (with no polling of any kind). You could of course use a thread that sleeps and wakes occasionally to check if data is ready. I don't imagine I need to provide an example of that; it has little to do with CUDA. And it seems to me that it is still "polling". The only thing I thought of to investigate was to use the driver API cuStreamWait32 or similar, and then see what the thread behavior was like if I selected cudaScheduleYield. But I haven't done that (although I left a placeholder in the class in case I get motivated to investigate).
Anyway here is an example based on polling:
$ cat t2227.cu
#include <iostream>
#include <cassert>
#include <limits>
// some usage rules:
// kernel producing data must be launched into a created stream
// host doing get must continue to do get as long as returned value is
// status_new_data
// host can go back to polling when returned value from get is status_ok
//
template <typename T>
class D2HRingBuffer
{
#define status_done 2
#define status_new_data 1
#define status_ok 0
#define err_illegal_size -1
T *buffer;
struct idxs {
unsigned first; // index of first empty slot in ring buffer
unsigned last; // index of last empty slot in ring buffer
} *idx;
volatile idxs *vidx;
volatile char *status;
volatile char *finished;
unsigned buffer_size;
cudaStream_t get_stream;
public:
~D2HRingBuffer(){cudaFree(buffer); cudaFreeHost((void *)status); cudaFreeHost((void *)finished); cudaFree(idx); cudaStreamDestroy(get_stream);}
D2HRingBuffer(unsigned size){
if (size == 0) {std::cout << "Buffer Size of zero not allowed." << std::endl; assert(0);}
// there must always be 2 empty slots in ring buffer
// size is the size not including these 2 empty slots
if (size > std::numeric_limits<unsigned int>::max()-2) {std::cout << "Buffer Size too large." << std::endl; assert(0);}
cudaError_t err = cudaMalloc(&buffer, (size+2)*sizeof(T));
if (err != cudaSuccess) {std::cout << "Buffer allocation failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
buffer_size = size;
err = cudaHostAlloc(&status, sizeof(status[0]),cudaHostAllocDefault);
if (err != cudaSuccess) {std::cout << "Status allocation failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
*status = status_ok;
err = cudaHostAlloc(&finished, sizeof(finished[0]),cudaHostAllocDefault);
if (err != cudaSuccess) {std::cout << "Finished allocation failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
*finished = status_ok;
err = cudaMalloc(&idx, sizeof(idxs));
if (err != cudaSuccess) {std::cout << "idx allocation failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
err = cudaMemset(idx, 0, sizeof(idxs));
err = cudaMemset(idx, 1, 1); // initialize first to 1
if (err != cudaSuccess) {std::cout << "idx init failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
vidx = (volatile idxs *)idx;
err = cudaStreamCreate(&get_stream);
if (err != cudaSuccess) {std::cout << "stream create failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
}
__device__ int put(T *data, unsigned size){
if (size == 0) return status_ok;
if (size > buffer_size) return err_illegal_size;
bool done = false;
while(!done){
unsigned first = vidx->first;
unsigned last = vidx->last;
unsigned avail, used;
if (first>last) {used = first-last-1; avail = buffer_size-used;}
else {avail = last-first-1; used = buffer_size-avail;}
if (avail >= size){
unsigned long long new_first = (unsigned long long)first+size;
if (new_first > (buffer_size+1)) new_first -= (buffer_size+2);
unsigned long long ni = (((unsigned long long)last)<<32)+new_first;
unsigned long long oi = (((unsigned long long)last)<<32)+first;
unsigned long long old = atomicCAS((unsigned long long *)idx, oi, ni);
if (old == oi) {
done = true;
int j = first;
for (int i = 0; i < size; i++){
if (j == buffer_size+2) j = 0;
buffer[j++] = data[i];}
__threadfence_system();
*status = status_new_data;
}
}
}
return status_ok;
}
__device__ int signal_done(){*status = status_new_data; __threadfence_system(); *finished = status_done; return status_ok;}
__device__ bool buffer_empty(){return vidx->next_empty == vidx->first_full;}
__host__ int get(T *data, unsigned max_size, unsigned &size){
idxs my_idx;
*status = status_ok;
cudaError_t err = cudaMemcpyAsync(&my_idx, idx, sizeof(unsigned long long), cudaMemcpyDeviceToHost, get_stream);
if (err != cudaSuccess) {std::cout << "get index failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
// check if buffer is empty
if (my_idx.first == (my_idx.last+1)) {size = 0; return status_ok;}
if ((my_idx.first == 0)&&(my_idx.last == (buffer_size+1))) {size = 0; return status_ok;}
unsigned ready;
unsigned long long new_last;
if (my_idx.first > my_idx.last) ready = my_idx.first-my_idx.last-1;
else {ready = buffer_size+1 - (my_idx.last - my_idx.first);}
if (ready > max_size) ready = max_size;
bool wrap = ((ready+my_idx.last) > (buffer_size+1));
if (wrap){// transfer in 2 parts
unsigned first_part = buffer_size+1-my_idx.last;
unsigned second_part = ready-first_part;
err = cudaMemcpyAsync(data, buffer+my_idx.last+1, sizeof(T)*first_part, cudaMemcpyDeviceToHost, get_stream);
if (err != cudaSuccess) {std::cout << "get read failure 1: " << cudaGetErrorString(err) << std::endl; assert(0);}
err = cudaMemcpyAsync(data+first_part, buffer, sizeof(T)*second_part, cudaMemcpyDeviceToHost, get_stream);
if (err != cudaSuccess) {std::cout << "get read failure 2: " << cudaGetErrorString(err) << std::endl; assert(0);}
new_last = second_part-1;
}
else {
err = cudaMemcpyAsync(data, buffer+my_idx.last+1, sizeof(T)*ready, cudaMemcpyDeviceToHost, get_stream);
if (err != cudaSuccess) {std::cout << "get read failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
new_last = (unsigned long long)my_idx.last+ready;
}
err = cudaMemcpyAsync((unsigned char *)idx+sizeof(unsigned), &new_last, sizeof(unsigned), cudaMemcpyHostToDevice, get_stream);
if (err != cudaSuccess) {std::cout << "set index failure: " << cudaGetErrorString(err) << std::endl; assert(0);}
size = ready;
return status_new_data;
}
__host__ int poll(){ if (*status == status_new_data) return status_new_data; if (*finished == status_done) return status_done; return status_ok;};
__host__ int stream_wait(){};
};
// end of ring buffer class
template <typename T>
__global__ void k1(D2HRingBuffer<T> *a, int *blk_count, int N, int test_val){
T data[5];
int idx=threadIdx.x+blockDim.x*blockIdx.x;
for (int t = idx; t < N; t+=gridDim.x*blockDim.x){
if (t%test_val == 0){
data[0] = ((unsigned long long)idx<<32)+t;
data[1] = data[2] = data[3] = data[4] = data[0];
int r = a->put(data, 5);
assert(r == status_ok);}}
__syncthreads();
if (!threadIdx.x){
int my_block = atomicAdd(blk_count, 1);
if (my_block == (gridDim.x - 1)) a->signal_done();}
}
using mt = unsigned long long;
const int nTPB = 1024;
const int nBLK = 80;
int main(int argc, char *argv[]){
int N = 1000;
if (argc > 1) N = atoi(argv[1]);
mt *b = new mt[N];
const int test_val = 123;
D2HRingBuffer<mt> a(33), *d_a;
int *d_bc;
cudaMalloc(&d_bc, sizeof(d_bc[0]));
cudaMemset(d_bc, 0, sizeof(d_bc[0]));
cudaMalloc(&d_a, sizeof(D2HRingBuffer<mt>));
cudaMemcpy(d_a, &a, sizeof(D2HRingBuffer<mt>), cudaMemcpyHostToDevice);
cudaStream_t sk;
cudaStreamCreate(&sk);
k1<<<nBLK,nTPB, 0, sk>>>(d_a, d_bc, N, test_val);
unsigned returned_size = 0;
unsigned total_size = 0;
while(a.poll() != status_done){
bool more = true;
while(more){
a.get(b+total_size, N-total_size, returned_size);
if (returned_size == 0) more = false;
total_size += returned_size;
returned_size = 0;}}
bool more = true;
while(more){
a.get(b+total_size, N-total_size, returned_size);
if (returned_size == 0) more = false;
total_size += returned_size;
returned_size = 0;}
cudaDeviceSynchronize();
cudaError_t err = cudaGetLastError();
if (err != cudaSuccess) std::cout << "Err: " << cudaGetErrorString(err) << std::endl;
std::cout << "total size: " << total_size << std::endl;
int test = 0;
for (int i = 0; i < N; i++) if ((i%test_val) == 0) test++;
std::cout << "expected size: " << test*5 << std::endl;
}
$ nvcc -o t2227 t2227.cu
$ ./t2227 1000000
total size: 40655
expected size: 40655
$
This is lightly tested code, not much more than you see here, so it should not be considered defect free without careful testing. Windows WDDM might require some extra nudges. Enable hardware GPU scheduling, if possible.