I am writing a Python C Extension for Graph-related computations.
One routine computes the edge list of a certain class of Graphs and since this is "embarassingly" parallelizable, I rewrote it to use a Threadpool.
At first I used OpenMP which was fine, however, I couldn't continue using it since its Threadpool didn't persist between Python calls of the routine.
So, I started to roll my own using the pthreads library.
Now, it almost always works, except:
almost all the jobs are almost always executed by a single thread, however, not always, I have encountered a fair distribution a couple of times, so I am not sure if this is an actual problem or if I have to fiddle with the scheduler to get more consistent distribution of workload
from time to time I get a segfault. Now, this doesn't happen in the multi-threaded(MT) code, this happens in the single-threaded(ST) code after the MT code. The MT code basically computes offsets into an array which the ST code uses for further processing. And like in 1% of cases, the memory that holds the offsets is not fully initialized, therefore the ST code goes out-of-bounds. I have no idea what causes this rare error. I have a unit test which compares the result to a Python computed result and it passes like 99% of the time, so it can't be that uninitialized memory is more often correct than wrong. Sometimes, the ST code doesn't go even out of bounds, but then creates a huge numpy array so that the interpreter takes quite a long time executing the function call
Well, I used some printfs to see that the offsets memory simply doesn't get fully initialized in the cases where the segfault occurs. Some offsets are then some garbage values in the range of a million or so, hence the out of bound memory access. In the 99% cases the memory is initialized exactly as expected.
The MT code also doesn't execute any Python code, however, it does read from the memory of a numpy array. But as far as I can tell from Python C Extension documentation that shouldn't cause any problems.
I am not quite sure what else to do, the code looks correct, it also works almost always.
I suspect there has to be something wrong with my Threadpool implementation since I didn't have this problem single-threaded or with OpenMP.
The scheduling behaviour also seems strange to me, I would have expected the workload to either be fairly equally distributed all the time OR it would simply be executed by one thread all the time and I messed up a mutex lock somewhere, but again, it changes from time time and it doesn't seem to be related with the error, I have seen the error occur in cases of fair distribution and single distribution.
I am not sure if code would help much at this point, I will provide some if requested.
Thank you.
UPDATE: Here is the code that deals with Job execution and synchronization between worker threads and main thread (I left out some struct declarations since I think they would only clutter up the code)
typedef struct{
volatile uint capacity;
volatile void** volatile data;
volatile uint cursor;
pthread_mutex_t mutex;
pthread_cond_t empty_condition;
} Queue;
typedef struct{
Queue* job_queue;
void (*job_process)(void*, int ID);
pthread_mutex_t* volatile barrier_mutex;
pthread_mutex_t progress_mutex;
pthread_cond_t progress_condition;
volatile uint work_in_progress;
volatile bool terminate;
} SynchronizationHandle;
/* after locking the queue, thread either:
- executes job from queue asynchronously
-- this also involves updating and signaling to main thread when job execution is in progress or finished
- or signals to main thread that queue is empty*/
static bool WorkerThread_do_job(SynchronizationHandle* sync_handle, int ID){
Queue* job_queue = sync_handle->job_queue;
pthread_mutex_lock(&(job_queue->mutex));
if(job_queue->cursor > 0){
// grab data from queue and give it free for other threads
void* job_data = (void*)job_queue->data[job_queue->cursor-1];
job_queue->cursor--;
pthread_mutex_unlock(&(job_queue->mutex));
pthread_mutex_lock(&(sync_handle->progress_mutex));
sync_handle->work_in_progress++;
pthread_mutex_unlock(&(sync_handle->progress_mutex));
sync_handle->job_process(job_data, ID);
pthread_mutex_lock(&(sync_handle->progress_mutex));
sync_handle->work_in_progress--;
if(sync_handle->work_in_progress == 0)
pthread_cond_signal(&(sync_handle->progress_condition));
pthread_mutex_unlock(&(sync_handle->progress_mutex));
return true;
}
else{
pthread_cond_signal(&(job_queue->empty_condition));
pthread_mutex_unlock(&(job_queue->mutex));
return false;
}
}
// function for worker threads
// the 2 nested loops correspond to a waiting phase and a work phase
static void* worker_thread(void* arguments){
WorkerThread* worker_thread = (WorkerThread*) arguments;
SynchronizationHandle* sync_handle = worker_thread->sync_handle;
// the outer loop deals with waiting on the main thread to start work or terminate
while(true){
// adress is saved to local variable so that main thread can change adress to other mutex without race condition
pthread_mutex_t* const barrier_mutex = (pthread_mutex_t* const)sync_handle->barrier_mutex;
// try to lock mutex and go to sleep and wait for main thread to unlock it
pthread_mutex_lock(barrier_mutex);
// unlock it for other threads
pthread_mutex_unlock(barrier_mutex);
// the inner loop executes jobs from the work queue and checks inbetween if it should terminate
while(true){
if(sync_handle->terminate)
pthread_exit(0);
if(!WorkerThread_do_job(sync_handle, worker_thread->ID))
break;
}
}
}
typedef struct{
pthread_t* thread_handles;
WorkerThread* worker_threads;
uint worker_threads_count;
SynchronizationHandle* sync_handle;
pthread_mutex_t barrier_mutexes[2];
uint barrier_mutex_cursor;
} ThreadPool;
static void ThreadPool_wakeup_workers(ThreadPool* pool){
// ASSUMPTION: thread executing already owns pool->barrier_mutexes + pool->barrier_mutex_cursor
// compute which mutex to switch to next
uint offset = (pool->barrier_mutex_cursor + 1)%2;
// lock next mutex before wake up so that worker threads can't get hold of it before main thread
pthread_mutex_lock(pool->barrier_mutexes + offset);
// change adress to the next mutex before unlocking previous mutex, otherwise race condition
pool->sync_handle->barrier_mutex = pool->barrier_mutexes + offset;
// unlocking the previous mutex "wakes up" the worker threads since they are trying to lock it
// hence why the assumption needs to hold
pthread_mutex_unlock(pool->barrier_mutexes + pool->barrier_mutex_cursor);
pool->barrier_mutex_cursor = offset;
}
static void ThreadPool_participate(ThreadPool* pool){
while(WorkerThread_do_job(pool->sync_handle, 0));
}
static void ThreadPool_waiton_workers(ThreadPool* pool){
// wait until queue is empty
pthread_mutex_lock(&(pool->sync_handle->job_queue->mutex));
while(pool->sync_handle->job_queue->cursor > 0)
pthread_cond_wait(&(pool->sync_handle->job_queue->empty_condition), &(pool->sync_handle->job_queue->mutex));
pthread_mutex_unlock(&(pool->sync_handle->job_queue->mutex));
// wait until all work in progress is finished
pthread_mutex_lock(&(pool->sync_handle->progress_mutex));
while(pool->sync_handle->work_in_progress > 0)
pthread_cond_wait(&(pool->sync_handle->progress_condition), &(pool->sync_handle->progress_mutex));
pthread_mutex_unlock(&(pool->sync_handle->progress_mutex));
}