5

I would like to serialize multithreaded invocations of a boost::signals2 signal in order to make sure that notifications about state changes from an object arrive at slots in a well defined order.

Background

I have an object with an internal state in a multithreaded program. Some parts of the internal state are interesting for other parts of the program, and the object exposes state changes by using a boost::signals2 signal, similar to this:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
        }
        m_OnStateChanged(newState);
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
};

Problem

If there are multiple concurrent invocations of the OnEvent handler, this can potentially lead to listeners being notified about state changes in another order than the changes actually took place. The state itself is protected by a mutex like above, so the actual state is welldefined. However, the mutex cannot be held across the call to the signal, as this could lead to deadlocks. This means that the actual invocations of the signal might happen in any order, whereas I would require them to be called in the same order as the state changes have actually taken place.

One way to handle this problem would be to remove the State from the signal and just notify listeners that the state has changed. They could then query the object for its state and would get the state that the object had when the signal was fired or a later state. In my scenario, the listeners need to be informed about all state changes so this method won't work here.

My next approach would be something like the following:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        boost::unique_future<void> waitForPrevious;
        boost::shared_ptr<boost::promise<void> > releaseNext;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
            waitForPrevious = m_CurrentInvocation->get_future();
            m_CurrentInvocation.reset(new boost::promise<void>());
            releaseNext = m_CurrentInvocation;
        }
        // Wait for all previous invocations of the signal to finish
        waitForPrevious.get();

        // Now it is our turn to invoke the signal
        // TODO: use try-catch / scoped object to release next if an exception is thrown
        OnStateChanged(newState);

        // Allow the next state change to use the signal
        releaseNext->set_value();
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
    // Initialized with a "fulfilled" promise in the constructor
    // or do special handling of initially empty promise above
    boost::shared_ptr<boost::promise<void> > m_CurrentInvocation;
};

I haven't tried the above code, so it might be littered with bugs and compilation errors, but it should be possible to deduce what I am after. My gut feeling tells me I'm not the first one to encounter this type of problem and I prefer using tried and tested code to my own... :) So my question is really:

Is there a preexisting way to achieve serialized invocations of a boost::signals2 signal (such as built into the signals2 library or a common pattern)?

villintehaspam
  • 8,540
  • 6
  • 45
  • 76
  • "However, the mutex cannot be held across the call to the signal, as this could lead to deadlocks." Could you clarify on this? Can some signal handlers change the state of `ObjectWithState`? Would they generate new `OnEvent()` calls? How do you make sure you do not enter infinite recursion? – user1202136 Apr 03 '12 at 15:03
  • @user1202136: Yes. a recursive call would be one way to get a deadlock. A more sneaky way is if you have two ObjectWithState and set up a handler for the state change that in some cases trigger a state change on the other. This leads to one thread holding the first mutex and trying to lock the other, and the other thread holding the other mutex and trying to lock the first. A general guideline is therefore to never hold a lock while calling unknown code, see http://drdobbs.com/article/print?articleId=202802983&siteSectionName= for a discussion. – villintehaspam Apr 04 '12 at 08:44

1 Answers1

0

I propose the following solution. Create a queue of pending signals and have a separate thread dispatch them. The code would roughly look as follows:

class ObjectWithState {
private:
    bool running;
    std::queue<State> pendingSignals;
    boost::condition_variable cond;
    boost::mutex mut;

    void dispatcherThread()
    {
        while (running)
        {
            /* local copy, so we don't need to hold a lock */
            std::vector<State> pendingSignalsCopy;

            /* wait for new signals, then copy them locally */
            {
                boost::unique_lock<boost::mutex> lock(mut);
                cond.wait(mut);
                pendingSignalsCopy = pendingSignals;
                pendingSignals.clear();
            }

            /* dispatch */
            while (!pendingSignalsCopy.empty())
            {
                State newState = pendingSignalsCopy.front();
                OnStateChanged(newState);
                pendingSignalsCopy.pop();
            }
        }
    }

public:
    void OnEvent()
    {
        State newState;
        ...

        /* add signal to queue of pending signals and wake up dispatcher thread */
        {
            boost::unique_lock<boost::mutex> lock(mut);
            pendingSignals.push(state);
            cond.notify_all();
        }
    }
};
villintehaspam
  • 8,540
  • 6
  • 45
  • 76
user1202136
  • 11,171
  • 4
  • 41
  • 62
  • This would partially achieve the same thing that I am going for, however I am not that thrilled about dedicating a separate thread per object. So the next thing would then be to share one thread for all objects - but then it's still a bit awkward to dedicate one thread for this, so you would want to distribute the work across your thread pool. Also, this implementation does not guarantee that all listeners have been notified when the OnStateChanged function exits, which may be an issue in some cases (but a feature in other). – villintehaspam Apr 04 '12 at 09:48
  • "Also, this implementation does not guarantee that all listeners have been notified when the OnStateChanged function exits" Why not? IIRC, boost::signals2::signal::operator() iterates through all slots and only then returns. Also, your proposal basically blocks all threads (except one) in waitForPrevious.get(). Therefore, it might actually be better to dedicate a thread for notifications. – user1202136 Apr 04 '12 at 13:09
  • replace OnStateChanged with OnEvent, sorry about the confusion. The difference in blocking is exactly what I am pointing out, in some cases blocking threads is necessary to ensure that the notification has taken place, in others it would be a feature to be able to let the threads move on. – villintehaspam Apr 04 '12 at 14:16