I'm new to Boost::Asio. I want to have a "Manager" process a lock free queue on a worker thread and send the result back to the main thread. Borrowing heavily from the answer here (Boost Asio pattern with GUI and worker thread) I've been able to get close to what I want.
Here is the code:
#include "stdafx.h"
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/signals2.hpp>
#include "Task.h"
class Manager
{
typedef boost::signals2::signal<void(int)> Signal;
public:
Manager()
{
Signal signal;
std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
_mainWork = boost::in_place(boost::ref(_mainService));
_workerWork = boost::in_place(boost::ref(_workerService));
_workerThread = std::thread(&Manager::workerMain, this);
_workerService.post(std::bind(&Manager::ProcessData, this, boost::ref(signal)));
};
virtual ~Manager() {};
void workerMain()
{
std::cout << "Worker thread: " << std::this_thread::get_id() << std::endl;
_workerService.poll();
}
void processResult(unsigned int x)
{
int result = x - 1;
std::cout << "Processing result = " << result << " on thread " << std::this_thread::get_id() << std::endl;
_numItemsPulled++;
if (_numItemsPushed == _numItemsPulled)
{
_mainWork = boost::none;
_mainService.stop();
}
}
void ProcessData(Signal& signal)
{
bool shutdown = false;
do
{
queue.consume_one([&](std::shared_ptr<Task> task)
{
if (task->IsShutdownRequest())
{
shutdown = true;
std::cout << "Shutting down on thread " << std::this_thread::get_id() << std::endl;
}
if (shutdown == false)
{
std::cout << "Getting element from queue on thread " << std::this_thread::get_id() << std::endl;
int result = task->Execute();
_mainService.post(std::bind(&Manager::processResult, this, result));
}
});
} while (shutdown == false);
}
void Push(int x)
{
if (x > 0)
{
std::shared_ptr<TimeSeriesTask> task = std::make_shared<TimeSeriesTask>(x);
queue.push(task);
_numItemsPushed++;
}
else
{
std::shared_ptr<ShutdownTask> task = std::make_shared<ShutdownTask>();
queue.push(task);
}
}
void QueueData(int x)
{
Push(x);
}
void StartEventLoop()
{
while (_mainService.stopped() == false)
{
_mainService.poll();
}
}
void Cleanup()
{
_workerWork = boost::none;
_workerThread.join();
}
private:
boost::asio::io_service _mainService;
boost::optional<boost::asio::io_service::work> _mainWork;
boost::asio::io_service _workerService;
boost::optional<boost::asio::io_service::work> _workerWork;
std::thread _workerThread;
int _numItemsPushed = 0;
int _numItemsPulled = 0;
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> queue;
};
int main()
{
std::shared_ptr<Manager> mgr = std::make_shared<Manager>();
mgr->QueueData(1);
mgr->QueueData(2);
mgr->QueueData(3);
mgr->StartEventLoop(); //why does work need to be posted first?
mgr->QueueData(-1);
mgr->Cleanup();
return 0;
}
As my comment line indicates, is there a way to start the event loop before posting work/queuing data? The goal is to have the event loop always polling and having other objects queue data as required. I was trying to start it in the Manager's constructor but no work gets processed if work gets posted afterwards.
One additional note: I can't block by using run, so polling seems to be the right choice.
Any guidance on what I'm missing is greatly appreciated. Thanks.
Additional info: I tried removing the while loop in StartEventLoop() and calling it before any queuing of data. Data gets queued and calculated on the worker thread but the result never gets sent back to the main thread.