1

I'm new to Boost::Asio. I want to have a "Manager" process a lock free queue on a worker thread and send the result back to the main thread. Borrowing heavily from the answer here (Boost Asio pattern with GUI and worker thread) I've been able to get close to what I want.

Here is the code:

#include "stdafx.h"

#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/signals2.hpp>

#include "Task.h"


class Manager
{
    typedef boost::signals2::signal<void(int)> Signal;

public:
    Manager()
    {
      Signal signal;
      std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;

      _mainWork = boost::in_place(boost::ref(_mainService));
      _workerWork = boost::in_place(boost::ref(_workerService));

      _workerThread = std::thread(&Manager::workerMain, this);
      _workerService.post(std::bind(&Manager::ProcessData, this, boost::ref(signal)));
   };

   virtual ~Manager() {};

   void workerMain()
   {
      std::cout << "Worker thread: " << std::this_thread::get_id() << std::endl;
      _workerService.poll();
   }

   void processResult(unsigned int x)
   {
      int result = x - 1;
      std::cout << "Processing result = " << result << " on thread " << std::this_thread::get_id() << std::endl;

      _numItemsPulled++;
      if (_numItemsPushed == _numItemsPulled)
      {
         _mainWork = boost::none;
         _mainService.stop();
      }
   }

   void ProcessData(Signal& signal)
   {
      bool shutdown = false;

      do
      {
         queue.consume_one([&](std::shared_ptr<Task> task)
         {
            if (task->IsShutdownRequest())
            {
               shutdown = true;
               std::cout << "Shutting down on thread " << std::this_thread::get_id() << std::endl;
            }

            if (shutdown == false)
            {
               std::cout << "Getting element from queue on thread " << std::this_thread::get_id() << std::endl;
               int result = task->Execute();
               _mainService.post(std::bind(&Manager::processResult, this, result));
            }
         });

      } while (shutdown == false);
   }

   void Push(int x)
   {
      if (x > 0)
      {
         std::shared_ptr<TimeSeriesTask> task = std::make_shared<TimeSeriesTask>(x);
         queue.push(task);
         _numItemsPushed++;
      }
      else
      {
         std::shared_ptr<ShutdownTask> task = std::make_shared<ShutdownTask>();
         queue.push(task);
      }
   }

   void QueueData(int x)
   {
      Push(x);
   }

   void StartEventLoop()
   {
      while (_mainService.stopped() == false)
      {
         _mainService.poll();
      }
   }

   void Cleanup()
   {
      _workerWork = boost::none;
      _workerThread.join();
   }

private:
   boost::asio::io_service _mainService;
   boost::optional<boost::asio::io_service::work> _mainWork;
   boost::asio::io_service _workerService;
   boost::optional<boost::asio::io_service::work> _workerWork;
   std::thread _workerThread;

   int _numItemsPushed = 0;
   int _numItemsPulled = 0;
   boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> queue;
};


int main()
{
   std::shared_ptr<Manager> mgr = std::make_shared<Manager>();

   mgr->QueueData(1);
   mgr->QueueData(2);
   mgr->QueueData(3);

   mgr->StartEventLoop(); //why does work need to be posted first?

   mgr->QueueData(-1);
   mgr->Cleanup();

   return 0;
}

As my comment line indicates, is there a way to start the event loop before posting work/queuing data? The goal is to have the event loop always polling and having other objects queue data as required. I was trying to start it in the Manager's constructor but no work gets processed if work gets posted afterwards.

One additional note: I can't block by using run, so polling seems to be the right choice.

Any guidance on what I'm missing is greatly appreciated. Thanks.

Additional info: I tried removing the while loop in StartEventLoop() and calling it before any queuing of data. Data gets queued and calculated on the worker thread but the result never gets sent back to the main thread.

Community
  • 1
  • 1
jslmsca
  • 196
  • 1
  • 14
  • I don't really see what you try to achieve here, why do you have two different `io_service` instances? Since `StartEventLoop()` blocks until `_mainService` is stopped, how would `mgr->QueueData(-1);` be ever executed? – m.s. May 07 '15 at 20:22
  • The reason for two io_service instances is to have the worker do the calculation and then have the main thread do additional processing (as per the example in the hyperlink). The main thread will eventually send out another signal which cannot be on a worker thread. When `processResult` sees that the number of events pushed equals the number of events processed, it stops `_mainService`. – jslmsca May 07 '15 at 20:25

1 Answers1

1
  1. This is a problem

    Signal signal;
    // ...
    _workerService.post(std::bind(&Manager::ProcessData, this, boost::ref(signal)));
    

    You're binding a task to a local variable that ceases to exist immediately after. Your program invokes Undefined Behaviour.

  2. This too

    void workerMain()
    {
       std::cout << "Worker thread: " << std::this_thread::get_id() << std::endl;
       _workerService.poll();
    }
    

    poll will just return so the thread exits prematurely. Use run() to keep the thread until the _workerWork is reset.

  3. The largest problem is that you post an infinite processing loop onto the event queue. ProcessData won't return, which is why the queue is blocked (and there's only one service thread too, so it's a permanent block).

    If you change it get rid of the loop, but just repost after completion:

    void ProcessData(Signal &signal) {
        bool shutdown = false;
    
        queue.consume_one([&](std::shared_ptr<Task> task) {
            if (task->IsShutdownRequest()) {
                shutdown = true;
                std::cout << "Shutting down on thread " << std::this_thread::get_id() << std::endl;
            }
    
            if (shutdown == false) {
                std::cout << "Getting element from queue on thread " << std::this_thread::get_id() << std::endl;
                int result = task->Execute();
                _mainService.post(std::bind(&Manager::processResult, this, result));
            }
        });
    
        if (!shutdown)
            _workerService.post(std::bind(&Manager::ProcessData, this, boost::ref(signal)));
    }
    

    This would at least not dead-lock, but it will likely create very high CPU load.

  4. Finally, you need to StartEventLoop asynchronously, or it won't ever return. Making it run on a separate thread might seem like an option, but could well be the way to violate the threading model requirements for spsc_queue... Beware here.


Frankly, all this looks like hopelessly over-complicated. I suspect you

  • either want low-latency high-throughput processing, in which case you want no asio at all, and just have a workerThread looping on consume_one

    Live Demo

    See listing below

  • or you wanted low-cost queueing and processing on an isolated thread. In that case use an old-fashioned locking queue see e.g.

    • c++ work queues with blocking (where Solution #1 uses Boost Asio to implement the queueing, and Solution #2 uses a simpler condition variable/mutex combination)

Demo Listing

#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <boost/thread.hpp>

struct Task {
    virtual ~Task() = default;
    virtual int  Execute()           const = 0;
    virtual bool IsShutdownRequest() const { return false; }
};

struct TimeSeriesTask : Task {
    TimeSeriesTask(int id) : _id(id) {}
    virtual int  Execute()           const { return _id;   }

  private:
    int _id;
};

struct ShutdownTask : Task {
    virtual int  Execute()           const { return 0;    }
    virtual bool IsShutdownRequest() const { return true; }
};

class Manager {
  public:
    Manager() : _running(false)
    { }

    void Start() {
        boost::lock_guard<boost::mutex> lk(_stateMutex);
        if (!_running) {
            _running = true;
            _workerThread = boost::thread(&Manager::workerMain, this);
        }
    }

    void workerMain() {
        while(_running) {
            queue.consume_one([this](std::shared_ptr<Task> task) {
                if (task->IsShutdownRequest()) {
                    _running = false;
                } else {
                    int result = task->Execute();
                    processResult(result);
                }
            });
        };
    }

    void processResult(unsigned int x) {
        int result = x - 1;
        std::cout << "Processing result = " << result << " on thread " << boost::this_thread::get_id() << std::endl;
    }

    void QueueData(int x) {
       if (x > 0)
            queue.push(std::make_shared<TimeSeriesTask>(x));
        else
            queue.push(std::make_shared<ShutdownTask>());
    }

    void Cleanup() {
        if (_workerThread.joinable())
            _workerThread.join();
    }

  private:
    boost::mutex _stateMutex;
    boost::atomic_bool _running;
    boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024> > queue;
    boost::thread _workerThread;
};

int main()
{
    Manager mgr;
    mgr.Start();

    mgr.QueueData(1);
    mgr.QueueData(2);
    mgr.QueueData(3);

    mgr.QueueData(-1);
    mgr.Cleanup();
}
Community
  • 1
  • 1
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thanks for the response. A few things... 1) I've made the signal a member variable; 2) From my understanding, `run()` blocks the worker thread. I don't want the main thread to be waiting for the worker to finish but rather process data as it is completed (hence, the signal to the main thread); 3) I am trying to get a processing loop on a worker thread that starts up when the Manager is created and doesn't shut down until the end of the application. Even if I remove the `do/while` loop in `ProcessData`, nothing ever gets queued if `StartEventLoop` is called before any queuing. – jslmsca May 07 '15 at 22:39
  • `run()` _runs_ on the worker thread. That's not blocking. `poll()` just returns so it _exits_ the worker thread. That's not useful. OTOH `ProcessData` _did_ block the `_workerService` so that's pretty much achieving the inverse of what you wanted. Also the tracking of `numPulled` and `numPushed` is just asking for race conditions. I've added my simpler demo. Strongly consider to /not optimize prematurely/ and use a blocking queue here. Threads don't have to wait **and** you don't have to juggle with complicated asynch event queues that you obviously lack experience with. – sehe May 07 '15 at 23:00
  • 1
    ^^ what @sehe said. If you can avoid the complication, do so. – Martin James May 07 '15 at 23:07
  • Thanks for clarifying. I will need some time to process the alternatives. The original reason for asio was the need to send a signal and data from the worker thread to the main thread for additional processing without blocking the main thread waiting for the worker to finish. The worker thread should continue processing queue data until the application shuts down, sending signals to the main thread when the data is ready. Condition variables block and I couldn't find another way. – jslmsca May 07 '15 at 23:09
  • FWIW Whether Asio internal queues block is not specified. They might, or they might not, AFAIK. One thing is certain: the performance will not improve with all the extra queuing implied. Perhaps see this interesting chat [On lockfree logging, queuing and memory order](http://chat.stackoverflow.com/rooms/69990/on-lockfree-logging-queuing-and-memory-order). It covers a lot of the same ground (also approaches with/without Asio and `spsc_queue`) with benchmarks included. – sehe May 07 '15 at 23:13
  • From what I can tell in the Demo Listing, `processResult` is happening on the workerThread. Unfortunately, that is why I was using asio/signals because I need the result sent to the mainThread. In our real application, `Execute()` does a calculation (the real work) and sends the result to the mainThread which then needs to manipulate the data based on user selections and then fires off another signal to listening objects (which may then involve database access). I suppose there is no simple way to send a signal with data from a worker thread to the main thread outside of asio? – jslmsca May 08 '15 at 00:59
  • Gee. That sounds like it could have been your question instead. Consider asking it as a separate question. – sehe May 08 '15 at 05:54
  • Thanks, sehe, for your help. I will read through your links and examples first. – jslmsca May 08 '15 at 14:48