0

In the complete code (see below), at some point (after N iterations) the consumer (function executionServiceHandler while (inService_) is entered, despite inService_ having been changed (in the same thread) [or so it seems from log output].

I have as far as possible tried to guard std::cout as well, although I'm not sure this is necessary.

Could you perhaps crit the code. I know strictly speaking I don't need to use StopServiceCmd to let the thread complete, but in my mind I can't see why it shouldn't work.

Also, I could have used std::function, but the example is simplified from original.

I could add that I tested this on GCC (latest), after the original example failed on VCC (2017)

Code follows (EDIT - now using std::function:

#include <thread>
#include <functional>
#include <mutex>
#include <iostream>
#include <queue>
#include <sstream>
#include <condition_variable>

class ThreadCmdConsumer
{
public:
    using Guard = std::lock_guard<std::mutex>;
    using UniqueLock = std::unique_lock<std::mutex>;

    ThreadCmdConsumer()
        : inService_(),
        executionServiceThread_(std::bind(&ThreadCmdConsumer::executionServiceHandler, this))
    {
        //Wait while thread does not exist...
        UniqueLock lock(cmdQMutex_);
        while (!inService_) {
            conditional_.wait(lock);
        }
        std::cout << std::hex << this << " constructed and consumer waiting..." << std::endl;
    }

    ~ThreadCmdConsumer() {
        static std::size_t nth = 0;
        {
            UniqueLock lock(cmdQMutex_);
            std::cout << "destructing (" << std::dec << nth++ << "): " << std::hex << this << std::endl;
        }
        process([this](){
            //Note: inService_ can only be changed in consumer thread...
            inService_ = false;
            std::cout << "consumer_->inService state: " << inService_ << std::endl;
        });

        UniqueLock lock(cmdQMutex_);
        std::cout << "producer " << std::hex << this << " destructor has lock" << std::endl;
        while (inService_) {
            std::cout << "producer " << std::hex << this << " destructor in service, entering wait" << std::endl;
            conditional_.wait(lock);
            std::cout << "producer " << std::hex << this << " destructor exited wait - has lock" << std::endl;
        }
        // Join will always succeed as result of StopServiceCmd that sets inService to false 
        // (in its own thread context). Once join completes, we are certain of executionServiceHandler
        // exiting normally.
        std::cout << "produces " << std::hex << this << " destructor joining" << std::endl;
        lock.unlock();

        try {
            executionServiceThread_.join();
        }
        catch (const std::system_error& ex) {
            UniqueLock lock(cmdQMutex_);//for cout
            std::cout << "Exception during join" << ex.what() << std::endl;
            abort();
        }
    }

    void executionServiceHandler()
    {
        { //Starts the service...
            UniqueLock lock(cmdQMutex_);
            inService_ = true;
            lock.unlock();
            conditional_.notify_one();
        }

        try {
        UniqueLock lock(cmdQMutex_);
        while (inService_) {
            std::cout << "consumer " << std::hex << this << " has lock" << std::endl;
            // Catering for spurious wake-ups too, hence while...
            while (cmdQ_.empty()) {
                std::cout << "consumer " << std::hex << this << " waiting" << std::endl;
                conditional_.wait(lock);
                std::cout << "consumer " << std::hex << this << " woken" << std::endl;
            }
            //Now we have the lock, and queue most certainly not empty
            auto cmd = std::move(cmdQ_.front());
            cmdQ_.pop();
            //@@@lock.unlock(); // Don't want to be locked while executing... removed conservatively
              (cmd)();
            } 

            std::cout << "consumer " << std::hex << this << " execution complete" << std::endl;
        }
        catch(const std::exception& ex) {
           std::cerr << "Unexpected " << ex.what() << std::endl;
           abort();
        }    
        //Not in service - notify when we've left (then we can truly join...)
        conditional_.notify_one();
    }

    void process(std::function<void()>&& cmd)
    {
        UniqueLock lock(cmdQMutex_);
        std::cout << "producer " << std::hex << this << " has lock" << std::endl;
        bool notificationRequired = cmdQ_.empty();
        cmdQ_.push(move(cmd));
        if (notificationRequired) {
            std::cout << "producer " << std::hex << this << " notifying" << std::endl;
            lock.unlock();
            conditional_.notify_one();
        }
        else {
            std::cout << "producer " << std::hex << this << " not notifying" << std::endl;
        }
    }

private:
    bool inService_;
    std::queue<std::function<void()>> cmdQ_;
    std::condition_variable conditional_;
    std::mutex cmdQMutex_;
    std::thread executionServiceThread_;
};


typedef std::function<void(const std::string&)> Handler;

struct ThreadOwner
{
    ThreadCmdConsumer executor_;
    // Do it done in the context of executor....
    void doIt(const Handler& handler)
    { }
};

int main()
{
    std::cout << "Program started" << std::endl;
    //Sometimes deadlocks on thread being killed
    for (int i = 0; i < 1000; ++i) {
        auto handler = [](const std::string&){};
        {
            ThreadOwner owner;
            owner.executor_.process([&handler, &owner]() {
            owner.doIt(handler);
            });
        }
    }
}
Werner Erasmus
  • 3,988
  • 17
  • 31
  • This awful lot of code. I'd tip you [Spurious wakeup](https://en.wikipedia.org/wiki/Spurious_wakeup). See http://en.cppreference.com/w/cpp/thread/condition_variable. Could yo also post a sample output? – Mihayl Nov 22 '17 at 10:40
  • I apologize for the amount of code, but this is a minimal (considering it handles destruction). I am considering spurious wakeup, hence the whiles – Werner Erasmus Nov 22 '17 at 10:45
  • I missed `#include ` in Visual Studio. – Mihayl Nov 22 '17 at 10:46
  • OK, I'll add that – Werner Erasmus Nov 22 '17 at 10:46
  • It ran without issues on Windows 8.1 64bit compiled with VS2013 in 32bit release – Mihayl Nov 22 '17 at 10:57
  • @AA, I would like it to run everywhere, and I would appreciate good review as to why I have a problem running this on GCC – Werner Erasmus Nov 22 '17 at 11:06
  • 1
    Last comment. I could reproduce it with gcc 4.8.5 on CentOS 7 x86_64. But you have to simplify to the bare minimal. It's still too much. – Mihayl Nov 22 '17 at 11:21
  • @AA. I might reduce it when I have some time – Werner Erasmus Nov 22 '17 at 11:27
  • @AA, reduced to use std::function – Werner Erasmus Nov 22 '17 at 12:02
  • Init your `inService_` with `false`, even the `std:.atomic` you used before is just a `struct` and need to be initialized – Mihayl Nov 22 '17 at 13:29
  • 2
    @AA `inService_()` - sets inService_ to `false`. – Maxim Egorushkin Nov 22 '17 at 13:29
  • @MaximEgorushkin, right – Mihayl Nov 22 '17 at 13:35
  • Your error does not reproduce. May be it did before you commented out `lock.unlock()` before `(cmd)()` because it acquires the mutex outside the while loop in `executionServiceHandler`. – Maxim Egorushkin Nov 22 '17 at 13:40
  • @MaximEgorushkin, It reproduces as is. I originally unlocked to prevent possible deadlock due to callback possible locking, but simplified the callback. As mentioned by AA, reproduces on gcc 4.8.5 (I've run on ideone) – Werner Erasmus Nov 22 '17 at 13:47
  • @WernerErasmus They say it reproduced 2 hours ago and then 1 hour ago you edited the code. I could not reproduce it and I cannot spot an issue with the current code. I could be wrong. – Maxim Egorushkin Nov 22 '17 at 13:50
  • @MaximEgorushkin (appreciate your eyes), code at https://ideone.com/fHHT3T fails, as just pasted from above just now. – Werner Erasmus Nov 22 '17 at 13:59
  • 1
    On ideone it gets terminated due to time constraint. – Maxim Egorushkin Nov 22 '17 at 14:03
  • 1
    `conditional_.wait(lock)` in `executionServiceHandler` could consume its own earlier `conditional_.notify_one()`, so the constructor never wakes up. It's unwise to have the same `condition_variable` guard two distinct conditions. – Igor Tandetnik Nov 22 '17 at 15:17
  • @IgorTandetnik, I'll modify and retest later, thanks – Werner Erasmus Nov 22 '17 at 15:57
  • 1
    @IgorTandetnik If there are no waiting threads on a condition variable when a notification call is made the notification is lost. – Maxim Egorushkin Nov 22 '17 at 22:13
  • @MaximEgorushkin Yes, right, I take my comment back. If the constructor gets to the mutex first, it'll be sitting in `wait` by the time `executionServiceHandler` calls `notify_one` for the first time, and should be woken up properly. If `executionServiceHandler` gets to the mutex first, then `inService_` will be true by the time the constructor checks it, and it won't wait at all. – Igor Tandetnik Nov 22 '17 at 22:19
  • The reason I wait in the constructor is that I want to prevent destruction prior to entering the thread. I think this can be improved by using a different variable and conditional variable that prevent destruction completion prior to thread exit. Constructor wait then redundant. The new code is working in VCC, I'll test in GCC. – Werner Erasmus Nov 23 '17 at 03:30
  • In this design the queue and the consumer are hard-coded together. A better design is to use a stand-alone producer-consumer queue class. With one producer and one consumer that queue can easily be wait-free (better than lock-free), e.g. [`boost::spsc`](http://www.boost.org/doc/libs/1_63_0/doc/html/lockfree/reference.html#header.boost.lockfree.spsc_queue_hpp). [An example of multiple producer multiple consumer queue](https://stackoverflow.com/a/5019461/412080). – Maxim Egorushkin Nov 23 '17 at 10:24

0 Answers0