3

I have a worker thread processing a queue of work items. work items might not be processable right now, so the worker thread might push them back into the queue.

void* workerFunc(void* arg) {
    WorkItem* item = NULL;

    while(true) {
        {
            scoped_lock(&queueMutex);
            while(workerRunning && workQueue.empty())
                pthread_cond_wait(&queueCondition, &queueMutex);
            if(!workerRunning)
                break;

            item = workQueue.front();
            workQueue.pop();
        }

        // process item, may take a while (therefore no lock here),
        // may also be considered unprocessable

        if(unprocessable) {
            scoped_lock(&queueMutex);
            workQueue.push(item);
        }
    }
    return NULL;
}

Now i need to do the following: from time to time, i need to scan through the work queue to remove items that are not needed anymore (from the same thread that enqueues work items). I cannot use the queueMutex for this, because i might miss the item that is currently being processed, so i need a way to pause the whole processing thread at a point where all undone items are actually inside the queue (preferrably right at the top of the while loop).

I thought about a second bool variable ("paused") in combination with another mutex and conditional variable, but then the special case where the worker is waiting for a signal on the queueCondition has to be handled; actually the pthread_cond_wait() call would have to unlock/lock both mutexes.

I guess there must be a simple solution to this problem, but i can't seem to be able to come up with it - I hope some of you will be able to help me out.

Thanks a lot in advance.

Sergey K.
  • 24,894
  • 13
  • 106
  • 174
Pontomedon
  • 937
  • 9
  • 18
  • 1
    If the producer (the thread that is also inserting items) is going to remove unnecessary items from the queue - it should grab the same mutex (`queueMutex`) - as it is modifying the queue. It doesn't matter if the processor thread has just popped an item off the queue and working on it - if it's unprocessed, the next time the producer scans the queue for unnecessary items, it will pick that up. – Nim Jul 02 '12 at 08:32
  • Why the condition wait while the queue is not empty? – jxh Jul 02 '12 at 08:57
  • 1
    oh, that's a typo, i'll fix it, sorry. – Pontomedon Jul 02 '12 at 09:07
  • 1
    I think that you need to think about the synchronization needed here. It seems to me that not only do you need to let the consumer thread know that it needs to pause for the 'garbage collection', but you need to have the consumer thread signal the producer thread that the consumer is actually paused. Then you have to 'unpause' the consumer. Not a simple scenario. – Michael Burr Jul 03 '12 at 03:56
  • @MichaelBurr A `pause` variable will work. It has three values: Pause not requested, pause requested, paused. The scanning thread sets it to "pause requested" and blocks on the cv until it's paused. The worker thread checks if a pause is requested in the `while` loop that waits. If one is, it sets `pause` to paused and enters a new `while` loop waiting for `pause` to go back to not requested. – David Schwartz Sep 30 '18 at 04:22

1 Answers1

4

Basically you need to emulate WinAPI's WaitForMultipleObjects() call on POSIX. POSIX doesn't have a single API to wait for all types of events/objects as WinAPI does.

Use pthread_cond_timedwait and clock_gettime. You can refer to this paper WaitFor API for many implementation details.

There is some interesting code there (too much to post in the answer, but usable) that can solve your problem.

P.S. Refer to this question for a discussion: WaitForSingleObject and WaitForMultipleObjects equivalent in linux

Community
  • 1
  • 1
Sergey K.
  • 24,894
  • 13
  • 106
  • 174