0

I want to implement a thread that can accept function pointers from a main thread and execute them serially. My idea was to use a struct that keeps the function pointer and its object and keep pushing it to a queue. This can be encapsulated in a class. The task thread can then pop from the queue and process it. I also need to synchronize it(so it doesnt block the main thread?), so I was thinking of using a semaphore. Although I have a decent idea of the structure of the program, I am having trouble coding this up, especially the threading and semaphore sync in C++11. It'd be great if someone can suggest an outline by which I can go about implementing this.

EDIT: The duplicate question answers the question about creating a thread pool. It looks like multiple threads are being created to do some work. I only need one thread that can queue function pointers and process them in the order they are received.

rookie
  • 1,168
  • 3
  • 14
  • 25
  • 'I also need to synchronize it(so it doesnt block the main thread?' it won't. Function calls/returns do not change thread context. 'accept function pointers from a main thread and execute them serially' - code is not owned by threads. If there is some function, any thread can call it. For clarity, however, if a function in the unit with main() is called from multiple threads, you should comment it as suc so that any other users can correctly handle any interations with data outside the function. – Martin James Feb 15 '18 at 07:01
  • Thanks for the comment. I understand the code is not owned by the threads. What I meant by serially executing the functions is that the thread can only process one function at a time. After it returns, the thread can process the next function. Hence the need to store the fn pointers in a queue. If there are no items in the queue, the thread needs to sleep/suspend instead of spinning. For this purpose, I would need some mechanism, like a semaphore I guess? – rookie Feb 15 '18 at 07:11
  • I wanted to give a prototype, but I can't reply since your question is already marked as duplicate – seccpur Feb 15 '18 at 07:19
  • I have edited to clarify my question. Hope someone can unlock it. – rookie Feb 15 '18 at 07:29
  • @seccpur : I have reopened. I agree it was not a duplicate of the listed question. – Martin Bonner supports Monica Feb 15 '18 at 07:35
  • @rookie It may be a duplicate of other questions though - it is quite a simple threading question (search for "single producer, single consumer"). I would suggest a condition variable rather than a semaphore (it doesn't matter what your threading question is; a semaphore is almost never the answer). – Martin Bonner supports Monica Feb 15 '18 at 07:36
  • @rookie sure - a semahore an be used to count the pointers in the queue and a mutex to protect the queue during push/pop. That's a 'standard' producer-consumer queue, and will work fine. When the queue is empty, the threads will block efficiently on the semaphore. – Martin James Feb 15 '18 at 11:15
  • @MartinBonner '(it doesn't matter what your threading question is; a semaphore is almost never the answer' well, I disagee with that. The 'classic' guaranteed-to-work unbounded producer-consumer queue uses one semaphore and a mutex. The 'classic' guaranteed-to-work bounded producer-consumer queue uses two semaphores and a mutex. There are no OS I know of that do not support semaphores, but some that do not support condvars. Even Windows did not support condvars until, I thnk, W7. – Martin James Feb 15 '18 at 11:21
  • @MartinJames : I still have scars from PrimOs where a semaphore was the only synchronization primitive. This may be clouding my judgement (after all, I only stopped using it in 1990, and the horror is still fairly fresh in my memory). – Martin Bonner supports Monica Feb 15 '18 at 11:46
  • @MartinJames - thanks for your insights, they were helpful. I agree it kind of boils down to a producer-consumer problem. C++11 not having an explicit 'semaphore', mutex+condition variable would be a way to implement it. Thanks again! – rookie Feb 15 '18 at 14:39

2 Answers2

2

Check this code snippet, I have implemented without using a class though. See if it helps a bit. Conditional variable could be avoided here, but I want the reader thread to poll only when there is a signal from the writer so that CPU cycles in the reader won't be wasted.

#include <iostream>
#include <functional>
#include <mutex>
#include <thread>
#include <queue>
#include <chrono>
#include <condition_variable>

using namespace std;

typedef function<void(void)> task_t;

queue<task_t> tasks;
mutex mu;
condition_variable cv;

bool stop = false;

void writer()
{
    while(!stop)
    {
        {
            unique_lock<mutex> lock(mu);
            task_t task = [](){ this_thread::sleep_for(chrono::milliseconds(100ms));   };
            tasks.push(task);
            cv.notify_one();
        }

        this_thread::sleep_for(chrono::milliseconds(500ms)); // writes every 500ms
    }
}

void reader()
{
    while(!stop)
    {
        unique_lock<mutex> lock(mu);
        cv.wait(lock,[]() { return !stop;});  
        while( !tasks.empty() )
        {

            auto task = tasks.front();            
            tasks.pop();
            lock.unlock();
            task();
            lock.lock();
        }

    }
}

int main()
{
    thread writer_thread([]() { writer();}  );
    thread reader_thread([]() { reader();}  );

    this_thread::sleep_for(chrono::seconds(3s)); // main other task

    stop = true;


    writer_thread.join();
    reader_thread.join();
}
seccpur
  • 4,996
  • 2
  • 13
  • 21
  • doesn't this block the queue while executing the task? You should probably call `lock.unlock();` before `task()` and `lock.lock();` afterwards? – PeterT Feb 15 '18 at 07:50
  • or use something like a reverse_lock `{boost::reverse_lock relocker; task(); }` – PeterT Feb 15 '18 at 07:57
  • I used a scope before the lock ( see the extra curly braces ) so that it locks only during push operation and not during the long sleep – seccpur Feb 15 '18 at 08:12
  • yes, but the mutex gets held by the consumer the whole time while it's executing the task, you can't add new tasks this way while a task is executing – PeterT Feb 15 '18 at 08:13
  • @Peter: Thanks, didn't see that, I try to modify the consumer side. EDITED . – seccpur Feb 15 '18 at 08:19
  • 1
    see this is why threading is hard. Now the consumer will potentially not get notified if he's already working on a task when the notification gets send out. It needs to still be `while( !tasks.empty() )` and you need to re-lock after you finish executing the task – PeterT Feb 15 '18 at 08:25
  • Just use a semaphore and a mutex. Unless there is some specific reason to not do so, why not go with the mechanism that is sure to work? – Martin James Feb 15 '18 at 11:17
1

Your problem has 2 parts. Storing the list of jobs and manipulating the jobs list in a threadsafe way.

For the first part, look into std::function, std::bind, and std::ref.

For the second part, this is similar to the producer/consumer problem. You can implement a semaphore using std::mutexand std::condition_variable.

There's a hint/outline. Now my full answer...

Step 1)

Store your function pointers in a queue of std::function.

std::queue<std::function<void()>>

Each element in the queue is a function that takes no arguments and returns void.

For functions that take arguments, use std::bind to bind the arguments.

void testfunc(int n);
...
int mynum = 5;
std::function<void()> f = std::bind(testfunction, mynum);

When f is invoked, i.e. f(), 5 will be passed as argument 1 to testfunc. std::bind copies mynum by value immediately.

You probably will want to be able to pass variables by reference as well. This is useful for getting results back from functions as well as passing in shared synchronization devices like semaphores and conditions. Use std::ref, the reference wrapper.

void testfunc2(int& n);  // function takes n by ref
...
int a = 5;
std::function<void()> f = std::bind(testfunction, std::ref(a));

std::function and std::bind can work with any callables--functions, functors, or lambdas--which is pretty neat!

Step 2)

A worker thread dequeues while the queue is non-empty. Your code should look similar to the producer/consumer problem.

class AsyncWorker
{
    ...

public:
    // called by main thread
    AddJob(std::function<void()> f)
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

private:
    worker_main()
    {
        while(!m_exitCondition)
            doJob();
    }

    void doJob()
    {
        std::function<void()> f;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            while (m_numJobs == 0)
                m_condition.wait(lock);

            if (m_exitCondition)
                return;

            f = std::move(m_queue.front());
            m_queue.pop();
            --m_numJobs;
        }
        f();
    }

    ...

Note 1: The synchronization code...with m_mutex, m_condition, and m_numJobs...is essentially what you have to use to implement a semaphore in C++'11. What I did here is more efficient than using a separate semaphore class because only 1 lock is locked. (A semaphore would have its own lock and you would still have to lock the shared queue).

Note 2: You can easily add additional worker threads.

Note 3: m_exitCondition in my example is an std::atomic<bool>

Actually setting up the AddJob function in a polymorphic way gets into C++'11 variadic templates and perfect forwarding...

class AsyncWorker
{
    ...

public:
    // called by main thread
    template <typename FUNCTOR, typename... ARGS>
    AddJob(FUNCTOR&& functor, ARGS&&... args)
    {
        std::function<void()> f(std::bind(std::forward<FUNCTOR>(functor), std::forward<ARGS&&>(args)...));
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

I think it may work if you just used pass-by-value instead of using the forwarding references, but I haven't tested this, while I know the perfect forwarding works great. Avoiding perfect forwarding may make the concept slightly less confusing but the code won't be much different...

Humphrey Winnebago
  • 1,512
  • 8
  • 15