-1

I'm currently having a problem regarding joining a finished std::thread. I have a simple consumer class:

//Consumer.h
class Consumer
{
public:
    //Ctor, Dtor

    void Start();
    void Release();


private:
    void Run();

    std::atomic<bool> m_terminate;
    std::thread m_thread;
};



//Consumer.cpp
void Consumer::Start()
{
    m_thread = std::thread(&EncodingProcessor::Run, this);
}

void Consumer::Release()
{
    m_terminate = true;
    if (m_thread.joinable())
        m_thread.join();        //Here is the deadlock
}

void Consumer::Run()
{
    while (true)
    {
        if (m_terminate)
            break;


        //This queue is blocking, it is fed with events from an encoder
        PMVR::HardwareEncoder::EventWithId currentEvent = m_hwEncoder.GetEncodeEvent();
        //If there is an event, again, wait for it, until the according frame is encoded
        WaitForSingleObject(currentEvent.handle, INFINITE);

        //Lock the encoded bitstream
        PMVR::HardwareEncoder::BitStreamBufferInfo currentData =
        m_hwEncoder.LockBitstream(currentEvent.id);

        std::vector<unsigned char> tmp((unsigned char*)currentData.bitstreamPointer,
        (unsigned char*)currentData.bitstreamPointer + currentData.bitstreamSize);
        //And process it.
        m_consumer->ProcessEncodingResult(currentData.bitstreamPointer, currentData.bitstreamSize);
        m_hwEncoder.UnlockBitstream(currentEvent.id);
    }
}

So I can start the thread. The thread does what it should. I can end the thread, so the loop inside Run() gets broken. However, if I want to join the thread, I encounter a deadlock.

We are not talking about the thread beeing released after main() finished. I can Release() it via keypress but non of the times it will work.

Edit: Start() is called this way:

m_processorThread = new Consumer(*m_hwEncoder,
    std::make_unique<FileSystemWriter>("file.h264"));
m_processorThread->Start();

Release() is called this way:

if (glfwGetKey(handler->GetWindow(), GLFW_KEY_M) && !m_pressed)
{
    m_pressed = true;
    sessionAPI.Close();
}

sessionAPI.close() just calls Release(). Nothing more.

Edit2:

I'm sorry, you are right. The code I've posted so far is working... So the problem seems to be inside the Run() method (updated, see above).

So my misconception was, that because of breaking at the top of the loop, the whole stuff beneath it wouldn't get executed... It looks like GetEncodeEvent() produces the deadlock. But why? Is there an elegant way of breaking the whole loop at a point where the thread isn't waiting for something? In addition, the provider of events is still alive, so there should be notifications...

Christoph
  • 606
  • 7
  • 21

2 Answers2

1

The problem I think, is here:

{
    if (m_terminate)
        break;


    //This queue is blocking, it is fed with events from an encoder
    PMVR::HardwareEncoder::EventWithId currentEvent = m_hwEncoder.GetEncodeEvent();
    //If there is an event, again, wait for it, until the according frame is encoded
    WaitForSingleObject(currentEvent.handle, INFINITE);

It' all very well to set m_terminate to true, but your thread is not looking there. It's blocked on the WaitForSingleObject line.

This is a good argument for using std::condition_variable.

example:

#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <cassert>

struct some_work {};

struct worker
{

  void start()
  {
    assert(_stopped);
    _stopped = false;
    // memory fence happened here. The above write is safe
    _thread = std::thread(&worker::run, this);
  }

  void stop()
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    // this is a memory fence
    assert(!_stopped);
    _stopped = true;
    // so is this
    lock.unlock();
    // notify_all, in case someone adds a thread pool and does not look here!
    // note: notify *after* we have released the lock.
    _state_changed.notify_all(); 
    if (_thread.joinable())
      _thread.join();
  }

  void post_work(some_work w)
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    assert(!_stopped);
    _workload.push(std::move(w));
    lock.unlock();
    // only notify one - we only added one piece of work.
    _state_changed.notify_one();
  }

  // allows a monitor to wait until all work is flushed before
  // stopping if necessary
  void wait()
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    _maybe_stop.wait(lock, [this] 
                        {
                          return should_stop()
                            or no_more_work();
                        });
  }

private:

  void run()
  {
    std::unique_lock<std::mutex> lock(_sc_mutex);
    _state_changed.wait(lock, [this]
                        { 
                          return this->work_to_do() 
                            or this->should_stop();
                        });
    if (should_stop())
      return;

    // there is work to do...
    auto my_work = std::move(_workload.front());
    _workload.pop();
    lock.unlock();

    // do my work here, once we've locked.

    // this is here for the wait() function above.
    // if you don't want a wait(), you can dump this
    lock.lock();
    if (no_more_work() or should_stop())
    {
        lock.unlock();
        _maybe_stop.notify_all();
    }

  }

  bool work_to_do() const { return not _workload.empty(); }
  bool no_more_work() const { return _workload.empty(); }
  bool should_stop() const { return _stopped; }

  std::mutex _sc_mutex;
  std::condition_variable _state_changed;
  std::condition_variable _maybe_stop;

  std::queue<some_work> _workload;

  std::thread _thread;

  bool _stopped = true;

};

int main()
{
  worker w;
  w.start();
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());

  // do we want to ensure that all the work is done?
  w.wait();
  w.stop();
}
Richard Hodges
  • 68,278
  • 7
  • 90
  • 142
  • 1
    @kfsone I'm not sure I follow? – Richard Hodges Aug 23 '16 at 19:52
  • The thing i don't understand is: The thread is never waiting for long. So there are 60 events per second. If i monitor m_terminate without actually breaking the loop it would look like this: 0,0,0,0,0,(press M to set terminate to true)1,1,1,1,1 and so on. With pressing the termination key, there is nothing more that stops working except this consumer thread. So even if it is, at that time, waiting for an other event, it should get one and afterwards go on, notice the change to terminate and break. But it doesn't. – Christoph Aug 23 '16 at 20:08
  • The main thread so to say. It's one producer, one consumer. The producer (OpenGL + Encoder) is the main application. It is guaranteed that the producer lives longer or at least exactly as long as the consuming thread. – Christoph Aug 23 '16 at 22:17
  • @Christoph so what if it decides to join (and therefore produce no more events) while the consuming thread is waiting on `WaitForSingleObject`? – Richard Hodges Aug 23 '16 at 22:33
  • Of course, that's what was missing for me. Thanks a lot! – Christoph Aug 24 '16 at 09:13
1

Your code indicates that GetEncodeEvent is blocking. If this is true, then its possible for your code to sit at that line of code indefinitely without ever seeing the change in m_terminate. Subsequently, the code may sit for an indefinite period of time at the WaitForSingleObject.

You might want to consider testing m_terminate throughout your function.

You can't interrupt WaitForSingleObject but you can specify a timeout and simply wrap that in a loop

for (;;) {
    if (m_terminate)
        return;
    auto res = WaitForSingleObject(currentEvent.handle, 20);
    switch (res) { // check the return value
      case WAIT_TIMEOUT: continue;
      case WAIT_OBJECT_0: break;
      default:  error(...);
    }
}

Your other option would be to create a WaitEvent for the thread and use WaitForMultipleObjects both handles and use SetEvent in Consumer::Release to notify the thread.

kfsone
  • 23,617
  • 2
  • 42
  • 74