0

I'll preface this by saying that I'm delving into multithreading for the first time. Despite a lot of reading on concurrency and synchronization, I'm not readily seeing a solution for the requirements I've been given.

Using C++11 and Boost, I'm trying to figure out how to send data from a worker thread to a main thread. The worker thread is spawned at the start of the application and continuously monitors a lock free queue. Objects populate this queue at various intervals. This part is working.

Once the data is available, it needs to be processed by the main thread since another signal will be sent to the rest of the application which cannot be on a worker thread. This is what I'm having trouble with.

If I have to block the main thread through a mutex or a condition variable until the worker thread is done, how will that improve responsiveness? I might as well just stay with a single thread so I have access to the data. I must be missing something here.

I have posted a couple questions, thinking that Boost::Asio was the way to go. There is an example of how signals and data can be sent between threads, but as the responses indicate, things get quickly overly-complicated and it's not working perfectly:

How to connect signal to boost::asio::io_service when posting work on different thread?

Boost::Asio with Main/Workers threads - Can I start event loop before posting work?

After speaking with some colleagues, it was suggested that two queues be used -- one input, one output. This would be in shared space and the output queue would be populated by the worker thread. The worker thread is always going but there would need to be a Timer, probably at the application level, that would force the main thread to examine the output queue to see if there were any pending tasks.

Any ideas on where I should direct my attention? Are there any techniques or strategies that might work for what I'm trying to do? I'll be looking at Timers next.

Thanks.

Edit: This is production code for a plugin system that post-processes simulation results. We are using C++11 first wherever possible, followed by Boost. We are using Boost's lockfree::queue. The application is doing what we want on a single thread but now we are trying to optimize where we see that there are performance issues (in this case, a calculation happening through another library). The main thread has a lot of responsibilities, including database access, which is why I want to limit what the worker thread actually does.

Update: I have already been successful in using std::thread to launch a worker thread that examines a Boost lock::free queue and processes tasks placed it in. It's step 5 in @Pressacco's response that I'm having trouble with. Any examples returning a value to the main thread when a worker thread is finished and informing the main thread, rather than simply waiting for the worker to finish?

Community
  • 1
  • 1
jslmsca
  • 196
  • 1
  • 14

2 Answers2

1

If your objective is develop the solution from scratch (using native threads, queues, etc.):

  1. create a thread save queue queue (Mutex/CriticalSection around add/remove)
  2. create a counting semaphore that is associated with the queue
  3. have one or more worker threads wait on the counting semaphore (i.e. the thread will block)
    • the semaphore is more efficient than having the thread constantly poll the queue
  4. as messages/jobs are added to the queue, increment the semaphore
    • a thread will wake up
    • the thread should remove one message
  5. if a result needs to be returned...
    • setup another: Queue+Semaphore+WorkerThreads

ADDITIONAL NOTES

If you decide to implement a thread safe queue from scratch, take a look at:

With that said, I would take another look at BOOST. I haven't used the library, but from what I hear it will most likely contain some relevant data structures (e.g. a thread safe queue).

My favorite quote from the MSDN:

"When you use multithreading of any sort, you potentially expose yourself to very serious and complex bugs"

SIDEBAR

Since you are looking at concurrent programming for the first time, you may wish to consider:

  • Is your objective to build production worthy code , or is this simply a learning exercise?
    • production? consider us existing proven libraries
    • learning? consider writing the code from scratch
  • Consider using a thread pool with an asynchronous callback instead of native threads.
  • more threads != better
  • Are threads really needed?
  • Follow the KISS principle.
Community
  • 1
  • 1
Pressacco
  • 2,815
  • 2
  • 26
  • 45
  • Hi, it's Step 5 that I'm trouble wrapping my head around. The result needs to be returned to the main thread so while the main thread is off doing all the other things, how do I inform it that "You now have to examine this queue now"? It's not another worker thread and I obviously cannot block the main thread for any length of time that would make the system unresponsive. – jslmsca May 09 '15 at 20:14
  • +1 for the [KISS principle](http://stackoverflow.com/questions/30110792/boostasio-with-main-workers-threads-can-i-start-event-loop-before-posting-wo#comment48337145_30112460) – sehe May 09 '15 at 22:46
  • If the main thread has other work to do, let it. Just make sure that the main thread periodically checks it's message queue to see if there is are any messages. This check can be non-blocking: if queue.count > 0 then queue.remove message – Pressacco May 10 '15 at 02:05
  • That's the crux of it... 1) How do I do this periodic checking? All I can think of is a global timer at the application level that periodically calls a method on this object to check the queue. 2) You did say "message" queue -- I'm assuming this "message" could just be an instance of data that needs to be processed? 3) Don't I have to set a lock/mutex on the output queue when the main thread is processing it in case the worker thread is trying to add additional items? – jslmsca May 10 '15 at 03:50
0

The feedback above led me in the right direction for what I needed. The solution was definitely simpler than having to use signals/slots or Boost::Asio as I had previously attempted. I have two lock-free queues, one for input (on a worker thread) and one for output (on the main thread, populated by the worker thread). I use a timer to schedule when the output queue is processed. The code is below; perhaps it is of use to somebody:

//Task.h

#include <iostream>
#include <thread>


class Task
{
public:
   Task(bool shutdown = false) : _shutdown(shutdown) {};
   virtual ~Task() {};

   bool IsShutdownRequest() { return _shutdown; }

   virtual int Execute() = 0;

private:
   bool _shutdown;
};


class ShutdownTask : public Task
{
public:
   ShutdownTask() : Task(true) {}

   virtual int Execute() { return -1; }
};


class TimeSeriesTask : public Task
{
public:
   TimeSeriesTask(int value) : _value(value) {};

   virtual int Execute()
   {
      std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl;
      return _value * 2;
   }

private:
   int _value;
};


// Main.cpp : Defines the entry point for the console application.

#include "stdafx.h"
#include "afxwin.h"

#include <boost/lockfree/spsc_queue.hpp>

#include "Task.h"

static UINT_PTR ProcessDataCheckTimerID = 0;
static const int ProcessDataCheckPeriodInMilliseconds = 100;


class Manager
{
public:
   Manager() 
   {
      //Worker Thread with application lifetime that processes a lock free queue
      _workerThread = std::thread(&Manager::ProcessInputData, this);
   };

   virtual ~Manager() 
   {
      _workerThread.join();
   };

   void QueueData(int x)
   {
      if (x > 0)
      {
         _inputQueue.push(std::make_shared<TimeSeriesTask>(x));
      }
      else
      {
         _inputQueue.push(std::make_shared<ShutdownTask>());
      }
   }

   void ProcessOutputData()
   {
      //process output data on the Main Thread
      _outputQueue.consume_one([&](int value)
      {
         if (value < 0)
         {
            PostQuitMessage(WM_QUIT);
         }
         else
         {
            int result = value - 1;
            std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl;
         }
      });
   }

private:
   void ProcessInputData()
   {
      bool shutdown = false;

      //Worker Thread processes input data indefinitely
      do
      {
         _inputQueue.consume_one([&](std::shared_ptr<Task> task)
         {    
            std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;           

            if (task->IsShutdownRequest()) { shutdown = true; }

            int result = task->Execute();
            _outputQueue.push(result);
         });

      } while (shutdown == false);
   }

   std::thread _workerThread;
   boost::lockfree::spsc_queue<std::shared_ptr<Task>,   boost::lockfree::capacity<1024>> _inputQueue;
   boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue;
};


std::shared_ptr<Manager> g_pMgr;


//timer to force Main Thread to process Manager's output queue
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime)
{
   if (nIDEvent == ProcessDataCheckTimerID)
   {
      KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds);
      ProcessDataCheckTimerID = 0;

      //call function to process data
      g_pMgr->ProcessOutputData();

      //reset timer
      ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
   }
}


int main()
{
   std::cout << "Main thread is " << std::this_thread::get_id() << std::endl;

   g_pMgr = std::make_shared<Manager>();

   ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);

   //queue up some dummy data
   for (int i = 1; i <= 10; i++)
   {
      g_pMgr->QueueData(i);
   }

   //queue a shutdown request
   g_pMgr->QueueData(-1);

   //fake the application's message loop
   MSG msg;
   bool shutdown = false;
   while (shutdown == false)
   {
      if (GetMessage(&msg, NULL, 0, 0))
      {
         TranslateMessage(&msg);
         DispatchMessage(&msg);
      }
      else   
      {
         shutdown = true;
      }
   }

   return 0;
}
jslmsca
  • 196
  • 1
  • 14
  • I'm glad my comments helped. FYI: waiting on an event is more efficient than while(true), and I would make sure that your queue is thread safe. If not, bad things will happen unexpectedly. ( I have never used the boost queue.) – Pressacco May 13 '15 at 01:21