2

So in the compilable code below, I'm sending a Query message to be handled by another thread and I want to wait for a response or timeout if it hits a certain timeout. I don't know why the wait_until is missing the signal and hitting the timeout period when it should not be doing that. It only happens if the handler is returning a response REALLY fast. How do you propose I fix the code below?

#include <mutex>
#include <memory>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <iostream>
#include <queue>
#include <zconf.h>

class Question
{

};

class Answer
{
public:
    bool isAnswered = false;
};

class Query
{
    std::condition_variable     _cv;
    std::mutex                  _mutex;
    std::atomic_bool            _questionAnswered;
    std::atomic_bool            _questionSet;

    std::shared_ptr<Question>   _question;
    std::shared_ptr<Answer>     _answer;

public:
    void setQuestion(std::shared_ptr<Question> & question)
    {
        if(!_questionSet)
        {
            _question = question;
            _questionSet = true;
        }
    };

    void setAnswer(std::shared_ptr<Answer> answer)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if(!_questionAnswered)
        {
            // Set the answer and notify the getAnswerWithTimeout() to unlock if holding
            _answer = answer;
            _questionAnswered = true;
            lock.unlock();
            _cv.notify_all();
        }
    };

    std::shared_ptr<Answer> getAnswerWithTimeout(uint64_t micros)
    {
        std::unique_lock<std::mutex> lock(_mutex);

        if(!_questionAnswered)
        {
            auto now = std::chrono::system_clock::now();

            // When timeout occurs, lock down this class, set the answer as null, and set error to timeout
            if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
            {
                _answer = nullptr;
                _questionAnswered = true;
            }
        }

        return _answer;
    };
};

void function_to_run(std::shared_ptr<Query> query)
{
    // Respond to query and set the answer
    auto answer = std::make_shared<Answer>();
    answer->isAnswered = true;

    // Set the response answer
    query->setAnswer(answer);
}



std::queue<std::shared_ptr<Query>> queryHandler;
bool keepRunning = true;
std::mutex queryHandlerMutex;
std::condition_variable queryHandlerCv;

void handleQueryHandler()
{
    while (true)
    {
        std::shared_ptr<Query> query;

        {
            std::unique_lock<std::mutex> lock(queryHandlerMutex);
            queryHandlerCv.wait(lock, [&] { return !keepRunning || !queryHandler.empty(); });
            if (!keepRunning) {
                return;
            }
            // Pop off item from queue
            query = queryHandler.front();
            queryHandler.pop();
        }

        // Process query with function
        function_to_run(query);
    }
}

void insertIntoQueryHandler(std::shared_ptr<Query> & query)
{
    {
        std::unique_lock<std::mutex> lock(queryHandlerMutex);
    
        // Insert into Query Handler
        queryHandler.emplace(query);
    }
    // Notify query handler to start if locked on empty
    queryHandlerCv.notify_one();
}

std::shared_ptr<Answer>
ask(std::shared_ptr<Query> query, uint64_t timeoutMicros=0)
{
    std::shared_ptr<Answer> answer = nullptr;

    // Send Query to be handled by external thread
    insertIntoQueryHandler(query);

    // Hold for the answer to be returned with timeout period
    answer = query->getAnswerWithTimeout(timeoutMicros);

    return answer;
}


int main()
{
    // Start Up Query Handler thread to handle Queries
    std::thread queryHandlerThread(handleQueryHandler);

    // Create queries in infinite loop and process
    for(int i = 0; i < 1000000; i++)
    {
        auto question = std::make_shared<Question>();
        auto query = std::make_shared<Query>();
        query->setQuestion(question);

        auto answer = ask(query, 1000);
        if(!answer)
        {
            std::cout << "Query Timed out after 1000us" << std::endl;
        }
    }
    // Stop the thread
    {
        std::unique_lock<std::mutex> lock(queryHandlerMutex);
        keepRunning = false;
    }
    queryHandlerCv.notify_one();
    queryHandlerThread.join();
    return 0;
}
Moe Bataineh
  • 1,032
  • 1
  • 12
  • 29
  • 2
    You forgot to lock `queryHandlerMutex` when inserting into and removing from the queue. In other words, your queue is not actually protected by the mutex. – jtbandes Dec 02 '20 at 20:45
  • 2
    I suggest building your code with tools such as the [Thread Sanitizer](https://clang.llvm.org/docs/ThreadSanitizer.html) (`-fsanitize=thread`), which will help catch issues like this automatically. – jtbandes Dec 02 '20 at 20:52
  • @jtbandes The code is pretty synchronous in this fashion so in this example there won't be a case where this is an issue. The main one is that of the wait_until is missing that signal from really fast responses when setAnswer is called. – Moe Bataineh Dec 02 '20 at 20:53
  • queryHandler.emplace and pop need to be mutexed, at the very least – Den-Jason Dec 02 '20 at 20:54
  • 2
    I understand your sentiment, but unfortunately that's not how concurrent programming works. The compiler and CPU are free to change your program's behavior in many ways which are difficult to predict (but which make correct programs run more efficiently). Tools like `mutex` and `condition_variable` are ways to restrict those changes so certain safety and correctness guarantees are maintained — to make sure your program works as intended. **`std::queue` is not thread safe, so you must synchronize accesses to it with locking primitives** — you can't simply say "this isn't an issue in this case"! – jtbandes Dec 02 '20 at 20:57
  • @jtbandes I understand. In my real codebase, I actually made a wrapper class for queue to be thread safe and that's what I was using. This code I took out from my real codebase and made a quick example of! I am using mutexing in the real deal. – Moe Bataineh Dec 02 '20 at 20:59
  • Well, when I modified this program to use mutexes correctly, the issue stopped happening. So you will need to provide a more accurate [mre]. – jtbandes Dec 02 '20 at 21:00
  • @jtbandes Interesting! I did the same thing and the issue still persists. Right above the pop statement and right above the emplace statement. Can you try running it in release mode? I would say this example is pretty minimal in my case. – Moe Bataineh Dec 02 '20 at 21:02
  • The timeout period seems to be very short - what happens with `_cv.wait_until` if some other processes steal the CPU for 10ms between grabbing "now" to waiting on "now + 1ms"? Would it just immediately bail even if there's a pending answer? – Den-Jason Dec 02 '20 at 21:03
  • Have you tried running it with `-fsanitize=thread`? Immediately above pop is not enough; the `queryHandler.empty()` check also needs to be protected. You can restructure the code to use that as a predicate to `queryHandlerCv.wait`. – jtbandes Dec 02 '20 at 21:04
  • @jtbandes, You're right, I'm being dumb. The issue still persists. I think the issue only happens if it runs so quickly that it's missing some signal as Den-Jason is saying. If you run fsanatize and it slows down the execution by 10 fold, you may not see the issue. It still happens with fsanatize but less frequently. – Moe Bataineh Dec 02 '20 at 21:06
  • Also the `_cv.notify_all();` doesn't need to be in the mutex – Den-Jason Dec 02 '20 at 21:11
  • Check out the changes I made here: https://gist.github.com/jtbandes/c3e4ea89da4a01c7ff537b4e2656d495/revisions The latest one ensures `function_to_run` is run while the mutex is _not_ locked. (I limited the main query loop to a specific number of iterations to prove the program exits cleanly, which helps when using the thread sanitizer, but you can increase that limit arbitrarily.) I also agree that `notify_one` shouldn't be in the mutex, and I've fixed that too — read more about this under Notes at https://en.cppreference.com/w/cpp/thread/condition_variable/notify_one – jtbandes Dec 02 '20 at 21:12
  • @jtbandes, Running the code still produced the weird timeouts. But instead of it happening 87 times in your 100k loop, it happens 0 - 5 times (with fsanatize enabled). fsanatize is not returning in errors for me. Turn off fsanatize for a moment and run in release mode. Does the wait_until time out for you? I feel like this thread deviated from the real issue to a cleanup issue but the changes to get it to successfully exit were useful in running the fsanatize and I thank you for that. – Moe Bataineh Dec 02 '20 at 21:23
  • What happens if you increase the timeout to, say, 100000us? – Den-Jason Dec 02 '20 at 21:32
  • Is it possible that your machine is overloaded with other processes and it just legitimately takes some time for the thread to be awakened? Can you try some other optimizations like pulling items off the queue in batches or adding multiple worker threads? Can you profile the code to see _where_ the time is being spent? At this point, I would venture to say the code is basically correct, but guesswork is not usually the best way to improve performance :) – jtbandes Dec 02 '20 at 21:40
  • @Den-Jason This is really weird. If I set it to anything below 10000, then I get the timeout error. The weird thing is that if I set to like 5000, then I get the timeout more often it DOES not seem like it's respecting the timeout aka waiting on the timeout period. It almost seems like the wait_until is being forcefully returned and returning false and not respecting the time wait. – Moe Bataineh Dec 02 '20 at 21:44
  • 2
    @MoeBataineh Your platform may have a high accuracy mode for "short" timeouts and a low accuracy mode for longer timeouts. Things may get strange as you stay close to that border. You really shouldn't use timeouts less than 20ms or so as behavior will tend to be unpredictable based on system power management and load. – David Schwartz Dec 02 '20 at 21:46
  • @DavidSchwartz, it does seem really unpredictable and it does resolve with higher timeouts. So what is the proper solution to get actually have accurate timeouts up to the millisecond? If I send a query to another thread to be processed and it does the query in 100us, then I would like to have finer control over to set a timeout period of 1ms. – Moe Bataineh Dec 02 '20 at 21:49
  • 1
    @MoeBataineh You can't. A general purpose operating system can't handle such small timeouts. What should it do? It can't schedule another task as it's unfair to that task to give it such a small timeslice (it will perform badly due to being descheduled after it warms the caches). Should it just burn the CPU? Then the thread that's burning the CPU will rapidly use up its timeslice and get de-scheduled. You can use administrative powers to grant a process the right to monopolize CPU resources like that, but it's not allowed by default. – David Schwartz Dec 02 '20 at 21:51
  • @DavidSchwartz That was exactly the issue. In my main codebase, I changed all the timeouts to be minimum 20000us and it solved all my issues. I'm going to have to do more reading about what happens a little more underneath the hood. Do you have any good resources? Make an answer and I will vote it up. – Moe Bataineh Dec 02 '20 at 22:07
  • One more question - what platform is this running on? It appears there are issues with implementations of `wait_until` - see https://stackoverflow.com/questions/39041450/stdcondition-variable-wait-until-surprising-behaviour – Den-Jason Dec 02 '20 at 22:12
  • In any case I would advocate using `wait_for`, which brings me back to my original premise - ***only use a condition variable to efficiently bail quickly from a poll loop*** – Den-Jason Dec 02 '20 at 22:15
  • Finally, bear in mind that shared_pointers are not thread-safe either, and may need to be mutex protected across threads. – Den-Jason Dec 02 '20 at 22:17
  • 1
    @Den-Jason I know. This was just a quick example I wrote up. I'm aware of the challenges of multi-threading applications and I work with a heavily threaded application at the moment. I really appreciate all your input. It was much appreciated. – Moe Bataineh Dec 02 '20 at 22:19
  • @MoeBataineh this was useful for me too; I learnt about `wait_until` and that it doesn't work in some scenarios. Plus lots of SO people like prefixing class members with underscores :P – Den-Jason Dec 02 '20 at 22:31

1 Answers1

2

As discussed in the comments, the main issue here is the timeout period you're using (1ms), in this interval:

     auto now = std::chrono::system_clock::now();

.... another thread may sneak in here ....

     if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
     {

another thread can sneak in and consume a timeslice (e.g. 10ms) and the wait_until would timeout immediately. Furthermore there are reports of unexpected behaviour with wait_until as described here: std::condition_variable wait_until surprising behaviour

Increasing the timeout to something in the order of several timeslices will fix this. You can also adjust thread priorities.

Personally I advocate polling a condition variable with wait_for which is efficient and also bails in a timely fashion (as opposed to polling a flag and sleeping).


Time slices in non-RTOS systems tend to be in the order of 10ms, so I would not expect such short timeouts to work accurately and predictably in general-purpose systems. See this for an introduction to pre-emptive multitasking: https://www.geeksforgeeks.org/time-slicing-in-cpu-scheduling/

as well as this: http://dev.ti.com/tirex/explore/node?node=AL.iEm6ATaD6muScZufjlQ__pTTHBmu__LATEST


As jtbandes points out, it's worth using tools such as Clang's thread sanitiser to check for potential logic races: https://clang.llvm.org/docs/ThreadSanitizer.html

Den-Jason
  • 2,395
  • 1
  • 22
  • 17