13

I got sensor data of various types that needs to be processed at different stages. From what I have read around, the most efficent way is to split the tasks into threads. Each puts the processed data into the entry queue of the next thread. So basically, a pipeline.

The data can be quite large (a few Mbs) so it needs to be copied out of the sensor buffer and then passed along to the threads that will modify it and pass it along.

I am interested in understanding the best way to do the passing. I read that, if I do message posting between threads, I could allocate the data and pass the pointer to the other threads so the receiving thread can take care of de-allocating it. I am not quite sure, how this would work for streaming data, that is to make sure that the threads process the messages in order (I guess I could add a time check?). Also what data structure should I use for such an implementation? I presume I would need to use locks anyway?

Would it be more efficient to have synchronised queues?

Let me know if other solutions are better. The computations need to happen in real-time so I need this to be really efficient. If anyone has links to good examples of data being passed through a pipeline of threads I would be very interested in looking at it.

Caveats: No boost or other libraries. Using Pthreads. I need to keep the implementation as close to the standard libraries as possible. This will eventually be used on various platforms (which I don't know yet).

unixsnob
  • 1,685
  • 2
  • 19
  • 45
  • Is there anything wrong with [`thread`](http://en.cppreference.com/w/cpp/thread) and [`queue`](http://en.cppreference.com/w/cpp/container/queue)? Each thread could handle/represent a phase of processing and have two shared queues one used for input and one for output. I am not sure how much gain would you get from such parallelization. Usually people try to parallelize the computation not the process'es logic itself because it's hard to scale. You can also end up harming yourself up to a deadlock if you starve one element of a chain from the computing power. Be careful. – luk32 Oct 21 '14 at 15:18
  • There is nothing wrong. But, it does not answer the question. You just re-iterated what I said in the sentence: "Each puts the processed data into the entry queue of the next thread." I.e. adding nothing. – unixsnob Oct 21 '14 at 15:24
  • 1
    why approach this the hard way when you have a complete library that does all this? https://www.threadingbuildingblocks.org/. I know your statement says you can't, but this supports all major platforms – cageman Oct 21 '14 at 15:26
  • @unixsnob Well, I believe I pointed you to the specific standard library (NB: there is one standard library AFAIK) components that will allow you to implement stuff you want, which you seemed to have no idea about (those are clickable links in there). I won't implement or design whole solution for you. Your questions about efficiency cannot be answered properly as they depend on concrete use-cases. – luk32 Oct 21 '14 at 15:39
  • @luk32: The question was if posting messages was better than queues. I already know about both and I know about the libraries too. Hence, the question about them. I am pretty sure that there are benchmarks out there and I was simply asking for pointers to them. So, as I said before. Your comment is just paraphrasing my first paragraph. – unixsnob Oct 21 '14 at 15:44
  • Also, I am using pthreads and not Thread for the exact reason stated on the caveats. – unixsnob Oct 21 '14 at 15:47
  • You say "*I need to keep the implementation as close to the standard libraries as possible.*". How is `pthreads` more standard than the actual parts of the standard?! – luk32 Oct 21 '14 at 17:04
  • read : http://stackoverflow.com/questions/13134186/c11-stdthreads-vs-posix-threads – unixsnob Oct 21 '14 at 17:18

1 Answers1

11

I had to do something similar recently. I used the approach of input/output queue. I think is the best and fast method to use. This is my version of a thread safe concurrent queue. I have in my project three working threads doing lots of calculation in sequence to the same buffer etc. Each thread use the pop from the input queue and push the output queue. So I have this wpop that wait the next buffer available in the queue. I hope can be usefull for you.

/*
    Thread_safe queue with conditional variable
*/
#include<queue>
#include<chrono>
#include<mutex>

template<typename dataType>
class CConcurrentQueue
{
private:
    /// Queue
    std::queue<dataType> m_queue;       
    /// Mutex to controll multiple access
    std::mutex m_mutex;                 
    /// Conditional variable used to fire event
    std::condition_variable m_cv;       
    /// Atomic variable used to terminate immediately wpop and wtpop functions
    std::atomic<bool> m_forceExit = false;  

public:
    /// <summary> Add a new element in the queue. </summary>
    /// <param name="data"> New element. </param>
    void push ( dataType const& data )
    {
        m_forceExit.store ( false );
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_queue.push ( data );
        lk.unlock ();
        m_cv.notify_one ();
    }
    /// <summary> Check queue empty. </summary>
    /// <returns> True if the queue is empty. </returns>
    bool isEmpty () const
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        return m_queue.empty ();
    }
    /// <summary> Pop element from queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <returns> false if the queue is empty. </returns>
    bool pop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        if ( m_queue.empty () )
        {
            return false;
        }
        else
        {
            popped_value = m_queue.front ();
            m_queue.pop ();
            return true;
        }
    }
    /// <summary> Wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    ///  <returns> False for forced exit. </returns>
    bool wpop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait ( lk, [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Timed wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <param name="milliseconds"> [in] Wait time. </param>
    ///  <returns> False for timeout or forced exit. </returns>
    bool wtpop ( dataType& popped_value , long milliseconds = 1000)
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait_for ( lk, std::chrono::milliseconds ( milliseconds  ), [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        if ( m_queue.empty () ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Queue size. </summary>    
    int size ()
    {   
        std::unique_lock<std::mutex> lk ( m_mutex );
        return static_cast< int >( m_queue.size () );
    }
    /// <summary> Free the queue and force stop. </summary>
    void clear ()
    { 
        m_forceExit.store( true );
        std::unique_lock<std::mutex> lk ( m_mutex );
        while ( !m_queue.empty () )
        {
            delete m_queue.front ();
            m_queue.pop ();
        }
    }
    /// <summary> Check queue in forced exit state. </summary>
    bool isExit () const
    {
        return m_forceExit.load();
    }

};
IFeelGood
  • 397
  • 1
  • 12
  • I get an error when trying to use this: error: no matching function for call to ‘std::unique_lock::unique_lock(const std::mutex&)’ std::unique_lock lk ( m_mutex ); – VoteCoffee Sep 30 '20 at 23:14
  • 2
    I forgot all the #include directives. – IFeelGood Sep 22 '21 at 09:09
  • just use moodycamel library for this: https://github.com/cameron314/concurrentqueue – avernus Sep 09 '22 at 23:02
  • `delete m_queue.front ();` is going to be UB if the queue is used for anything else than storing pointers to `new`ed objects, which the interface however doesn't enforce. This is also not going to work with non-copyable types. – user17732522 Sep 11 '22 at 17:06