0

I am writing a Python C Extension for Graph-related computations.

One routine computes the edge list of a certain class of Graphs and since this is "embarassingly" parallelizable, I rewrote it to use a Threadpool.

At first I used OpenMP which was fine, however, I couldn't continue using it since its Threadpool didn't persist between Python calls of the routine.

So, I started to roll my own using the pthreads library.

Now, it almost always works, except:

  • almost all the jobs are almost always executed by a single thread, however, not always, I have encountered a fair distribution a couple of times, so I am not sure if this is an actual problem or if I have to fiddle with the scheduler to get more consistent distribution of workload

  • from time to time I get a segfault. Now, this doesn't happen in the multi-threaded(MT) code, this happens in the single-threaded(ST) code after the MT code. The MT code basically computes offsets into an array which the ST code uses for further processing. And like in 1% of cases, the memory that holds the offsets is not fully initialized, therefore the ST code goes out-of-bounds. I have no idea what causes this rare error. I have a unit test which compares the result to a Python computed result and it passes like 99% of the time, so it can't be that uninitialized memory is more often correct than wrong. Sometimes, the ST code doesn't go even out of bounds, but then creates a huge numpy array so that the interpreter takes quite a long time executing the function call

Well, I used some printfs to see that the offsets memory simply doesn't get fully initialized in the cases where the segfault occurs. Some offsets are then some garbage values in the range of a million or so, hence the out of bound memory access. In the 99% cases the memory is initialized exactly as expected.

The MT code also doesn't execute any Python code, however, it does read from the memory of a numpy array. But as far as I can tell from Python C Extension documentation that shouldn't cause any problems.

I am not quite sure what else to do, the code looks correct, it also works almost always.

I suspect there has to be something wrong with my Threadpool implementation since I didn't have this problem single-threaded or with OpenMP.

The scheduling behaviour also seems strange to me, I would have expected the workload to either be fairly equally distributed all the time OR it would simply be executed by one thread all the time and I messed up a mutex lock somewhere, but again, it changes from time time and it doesn't seem to be related with the error, I have seen the error occur in cases of fair distribution and single distribution.

I am not sure if code would help much at this point, I will provide some if requested.

Thank you.

UPDATE: Here is the code that deals with Job execution and synchronization between worker threads and main thread (I left out some struct declarations since I think they would only clutter up the code)

typedef struct{
    volatile uint capacity;
    volatile void** volatile data;

    volatile uint cursor;
    pthread_mutex_t mutex;
    pthread_cond_t empty_condition;
} Queue;

typedef struct{
    Queue* job_queue;
    void (*job_process)(void*, int ID);

    pthread_mutex_t* volatile barrier_mutex;

    pthread_mutex_t progress_mutex;
    pthread_cond_t progress_condition;
    volatile uint work_in_progress;

    volatile bool terminate;
} SynchronizationHandle;

/* after locking the queue, thread either:
    - executes job from queue asynchronously
    -- this also involves updating and signaling to main thread when job execution is in progress or finished
    - or signals to main thread that queue is empty*/

static bool WorkerThread_do_job(SynchronizationHandle* sync_handle, int ID){  
    Queue* job_queue = sync_handle->job_queue;
    pthread_mutex_lock(&(job_queue->mutex));

    if(job_queue->cursor > 0){
        // grab data from queue and give it free for other threads
        void* job_data = (void*)job_queue->data[job_queue->cursor-1];
        job_queue->cursor--;
        pthread_mutex_unlock(&(job_queue->mutex));

        pthread_mutex_lock(&(sync_handle->progress_mutex));
        sync_handle->work_in_progress++;
        pthread_mutex_unlock(&(sync_handle->progress_mutex));

        sync_handle->job_process(job_data, ID);

        pthread_mutex_lock(&(sync_handle->progress_mutex));
        sync_handle->work_in_progress--;
        if(sync_handle->work_in_progress == 0)
            pthread_cond_signal(&(sync_handle->progress_condition));
        pthread_mutex_unlock(&(sync_handle->progress_mutex));

        return true;
    }
    else{
        pthread_cond_signal(&(job_queue->empty_condition));
        pthread_mutex_unlock(&(job_queue->mutex));
    
        return false;
    }
}

// function for worker threads
// the 2 nested loops correspond to a waiting phase and a work phase
static void* worker_thread(void* arguments){
    WorkerThread* worker_thread = (WorkerThread*) arguments;
    SynchronizationHandle* sync_handle = worker_thread->sync_handle;

    // the outer loop deals with waiting on the main thread to start work or terminate
    while(true){
        // adress is saved to local variable so that main thread can change adress to other mutex without race condition
        pthread_mutex_t* const barrier_mutex = (pthread_mutex_t* const)sync_handle->barrier_mutex;
        // try to lock mutex and go to sleep and wait for main thread to unlock it
        pthread_mutex_lock(barrier_mutex);
        // unlock it for other threads
        pthread_mutex_unlock(barrier_mutex);

        // the inner loop executes jobs from the work queue and checks inbetween if it should terminate
        while(true){
            if(sync_handle->terminate)
                pthread_exit(0);
            if(!WorkerThread_do_job(sync_handle, worker_thread->ID))
                break;
        }
    }
}

typedef struct{
    pthread_t* thread_handles;
    WorkerThread* worker_threads;
    uint worker_threads_count;

    SynchronizationHandle* sync_handle;
    pthread_mutex_t barrier_mutexes[2];
    uint barrier_mutex_cursor;
} ThreadPool;

static void ThreadPool_wakeup_workers(ThreadPool* pool){
    // ASSUMPTION: thread executing already owns pool->barrier_mutexes + pool->barrier_mutex_cursor

    // compute which mutex to switch to next
    uint offset = (pool->barrier_mutex_cursor + 1)%2;

    // lock next mutex before wake up so that worker threads can't get hold of it before main thread
    pthread_mutex_lock(pool->barrier_mutexes + offset);

    // change adress to the next mutex before unlocking previous mutex, otherwise race condition
    pool->sync_handle->barrier_mutex = pool->barrier_mutexes + offset;

    // unlocking the previous mutex "wakes up" the worker threads since they are trying to lock it
    // hence why the assumption needs to hold
    pthread_mutex_unlock(pool->barrier_mutexes + pool->barrier_mutex_cursor);
    
    pool->barrier_mutex_cursor = offset;
}

static void ThreadPool_participate(ThreadPool* pool){
    while(WorkerThread_do_job(pool->sync_handle, 0));
}

static void ThreadPool_waiton_workers(ThreadPool* pool){
    // wait until queue is empty
    pthread_mutex_lock(&(pool->sync_handle->job_queue->mutex));
    while(pool->sync_handle->job_queue->cursor > 0)
        pthread_cond_wait(&(pool->sync_handle->job_queue->empty_condition), &(pool->sync_handle->job_queue->mutex));
    pthread_mutex_unlock(&(pool->sync_handle->job_queue->mutex));

    // wait until all work in progress is finished
    pthread_mutex_lock(&(pool->sync_handle->progress_mutex));
    while(pool->sync_handle->work_in_progress > 0)
        pthread_cond_wait(&(pool->sync_handle->progress_condition), &(pool->sync_handle->progress_mutex));
    pthread_mutex_unlock(&(pool->sync_handle->progress_mutex));
}
John Bollinger
  • 160,171
  • 8
  • 81
  • 157
Manatee Pink
  • 160
  • 7
  • 1
    I'm sorry you're having trouble, You haven't given us anything specific to which we could address an answer. – John Bollinger Mar 13 '23 at 23:31
  • @JohnBollinger, thanks for responding. Are you asking for a code example? I mean, part of the problem is that I am not sure what to do or show. I cannot pinpoint a small part of the code that seems responsible. And again, the error occurs seamingly randomly and the only non-determinism in the whole process is the OS scheduler. So, I can add code, but if you have a suggestion on how I can present something more focused that would be great – Manatee Pink Mar 14 '23 at 14:55
  • Our usual expectation -- which is often a requirement in practice -- is that the question present a [mre]. I understand that the nature of your particular problem may make it difficult or impossible to prepare an MRE. If so, then SO probably is not a good venue for help in finding a resolution. I'm sorry, but I'm sure you understand that we are unlikely to be able to solve a problem arising from the details of your code without reviewing your code, and I hope you appreciate that few of us are prepared to devote our time to analyzing multi-hundred-line code samples for free. – John Bollinger Mar 14 '23 at 15:02
  • No, I get it. I'll see if I can whip up something short – Manatee Pink Mar 14 '23 at 15:42
  • @JohnBollinger, I have now added the code that I suspect would cause the problems – Manatee Pink Mar 15 '23 at 13:35
  • 1
    Almost the very first thing I see is suspicious: [`volatile` is not useful in multithreaded C programming](https://stackoverflow.com/q/2484980/2402272), and seeing a bunch of `volatile` structure members leads me to suppose that you are assuming that accessing `volatile` data is inherently thread-safe when in fact, it is not. – John Bollinger Mar 15 '23 at 14:23
  • I am not assuming that. All volatile members are accessed inside a mutex. I haven't read in any pthread documentation though that memory barriers are also used so I use volatile to make sure reads and writes aren't mistakenly optimized out. Can I safely remove volatile, do pthread mutexes provide memory barriers as well? – Manatee Pink Mar 15 '23 at 14:30
  • Again, `volatile` is not useful in multithreaded C programming. If your code is free of data races then a conforming C compiler will not improperly optimize out memory accesses. Pthreads mutexes are among the tools you can employ to avoid data races. – John Bollinger Mar 15 '23 at 14:35
  • Well, removing volatile didn't remove the error, but it seems to have made it rarer? Before it took like 100 tries to get an error, now it takes like a 1000. Also, no segfault sofar, only a huge numpy array (same underlying cause though, the offsets are used to compute the size of the array whether the uninitialized memory has small enough values to avoid an out of bounds error doesn't matter). Not sure what to make of this. Any other ideas? – Manatee Pink Mar 15 '23 at 15:18

1 Answers1

1

The code presented has good bones, but it nevertheless has numerous synchronization issues.

It contains some probable data races:

  • In worker_thread():

    • pointer sync_handle->barrier_mutex is accessed (read) without mutex protection or other effective synchronization. It looks like this pointer will be written by ThreadPool_wakeup_workers(), so a data race occurs if ThreadPool_wakeup_workers() is called while any worker threads are alive.

    • sync_handle->terminate is accessed (read) without mutex protection or other effective synchronization. I don't see any code that writes to this object, but I presume that such writes do take place somewhere, while worker threads are alive, which creates a data race. Maybe you just want to make this one _Atomic.

    I assume that *worker_thread is not accessed by other threads (via a different pointer) while worker_thread() is running, else there would be more.

Also some other possible data races:

  • WorkerThread_do_job() reads pointer sync_handle->job_queue without mutex protection or any other effective synchronization. If the pointer is also written while any of the worker threads are alive, then that creates a data race.

  • WorkerThread_do_job() reads pointer sync_handle->job_process without mutex protection or any other effective synchronization. If the pointer is also written while any of the worker threads are alive, then that creates a data race.

  • ThreadPool_wakeup_workers() accesses pool->barrier_mutex_cursor and pool->barrier_mutexes, apparently without a consistent choice of mutex to protect that access. If any other thread writes to these members during the lifetime of the thread pool management thread then that may create a data race.

  • ThreadPool_waiton_workers() reads pointers pool->sync_handle and pool->sync_handle->job_queue without mutex protection or any other effective synchronization. If either of these is also written by another thread while the pool management thread is alive, then that creates a data race.

Additionally,

  • Your double-mutex barrier is potentially racy (but does not in this way have a data race per se) in that it assumes that all the worker threads are in fact blocked trying to acquire the current barrier mutex. It's not clear who calls this function or when, but if it doesn't do anything to ensure that the workers have in fact all reached the barrier mutex first, then the workers can fall out of step with each other.

    I don't particularly like the double-mutex approach in general. I think a better and more reliable barrier could be constructed from one mutex, a condition variable, and a couple of int variables. Basically, you want the threads involved to be able to determine whether to proceed based on data, rather than based on acquiring a mutex. Or at least you want the pool manager to be able to make such determination.

  • WorkerThread_do_job() is racy with respect to ThreadPool_waiton_workers() in that the decrement of job_queue->cursor and increment of sync_handle->work_in_progress are not performed as an atomic unit. If a thread in ThreadPool_waiton_workers() runs between those events, then it can see the queue cursor and progress counter both at zero, and therefore think that all the workers are done, when in fact, one or more workers are instead about to increment the progress counter.

  • In worker_thread(), a pointer of type pthread_mutex_t * const is passed to pthread_mutex_lock() and pthread_mutex_unlock(), which expect instead a parameter of type pthread_mutex_t *. Your compiler should be warning about that. This is semantically incorrect because one must suppose that these two functions modify the mutex to which the argument points, so a pointer to const data is not a suitable argument.

  • As I wrote in comments, volatile has no role to play in multithreaded programming.


None of that directly addresses the specific issues you raised, that one thread usually takes all the jobs, and that occasionally the single-threaded code running after completion of the multi-threaded workload fails with a segfault.

With respect to the former, I speculate that the job_process() function does very little work per call, in which case one thread might be able to cycle the inner loop in worker_thread() many times while mostly blocking other threads by repeatedly acquiring (and releasing) the various mutexes needed. Pthreads mutexes do not ensure fair scheduling by default, so if a thread is quick enough, it can release a mutex and then be the next thread to acquire it, even though other threads are also contending for it. This is in fact reasonably likely when the time between release and attempted reacquisition is very short, and such mutex-hogging is a known behavior in such cases.

If your job_process() is indeed so fine grained, however, then that's a deeper problem. Fair scheduling for the mutex would probably subject you to high overhead from all the mutex operations. One possible mitigation in that case would be to make job_process() do more work per call.

With respect to the segfaults, I speculate that the memory accesses by job_process() are not (reliably) synchronized relative to those of the thread that continues past the parallel section, so that there is a data race there, too.

John Bollinger
  • 160,171
  • 8
  • 81
  • 157
  • thanks for all the pointers, much appreciated. It seems to me that I simply need to start over and replace the double mitex approach as you suggested. I am not sure if I should accept this answer since it didn't actually find the error, but then again, the question makes that hard to do anyway. What would you say? – Manatee Pink Mar 17 '23 at 07:30
  • 1
    @ManateePink, that's ultimately up to you. Accepting this answer would make it virtually certain that you will not get any others, though I doubt you will get any more either way. Note well that accepting answer doesn't mean "this is correct"; rather, it means "I'm satisfied". And selfishly speaking, *of course* you should accept this answer :-) – John Bollinger Mar 17 '23 at 12:28