So in the compilable code below, I'm sending a Query message to be handled by another thread and I want to wait for a response or timeout if it hits a certain timeout. I don't know why the wait_until is missing the signal and hitting the timeout period when it should not be doing that. It only happens if the handler is returning a response REALLY fast. How do you propose I fix the code below?
#include <mutex>
#include <memory>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <iostream>
#include <queue>
#include <zconf.h>
class Question
{
};
class Answer
{
public:
bool isAnswered = false;
};
class Query
{
std::condition_variable _cv;
std::mutex _mutex;
std::atomic_bool _questionAnswered;
std::atomic_bool _questionSet;
std::shared_ptr<Question> _question;
std::shared_ptr<Answer> _answer;
public:
void setQuestion(std::shared_ptr<Question> & question)
{
if(!_questionSet)
{
_question = question;
_questionSet = true;
}
};
void setAnswer(std::shared_ptr<Answer> answer)
{
std::unique_lock<std::mutex> lock(_mutex);
if(!_questionAnswered)
{
// Set the answer and notify the getAnswerWithTimeout() to unlock if holding
_answer = answer;
_questionAnswered = true;
lock.unlock();
_cv.notify_all();
}
};
std::shared_ptr<Answer> getAnswerWithTimeout(uint64_t micros)
{
std::unique_lock<std::mutex> lock(_mutex);
if(!_questionAnswered)
{
auto now = std::chrono::system_clock::now();
// When timeout occurs, lock down this class, set the answer as null, and set error to timeout
if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
{
_answer = nullptr;
_questionAnswered = true;
}
}
return _answer;
};
};
void function_to_run(std::shared_ptr<Query> query)
{
// Respond to query and set the answer
auto answer = std::make_shared<Answer>();
answer->isAnswered = true;
// Set the response answer
query->setAnswer(answer);
}
std::queue<std::shared_ptr<Query>> queryHandler;
bool keepRunning = true;
std::mutex queryHandlerMutex;
std::condition_variable queryHandlerCv;
void handleQueryHandler()
{
while (true)
{
std::shared_ptr<Query> query;
{
std::unique_lock<std::mutex> lock(queryHandlerMutex);
queryHandlerCv.wait(lock, [&] { return !keepRunning || !queryHandler.empty(); });
if (!keepRunning) {
return;
}
// Pop off item from queue
query = queryHandler.front();
queryHandler.pop();
}
// Process query with function
function_to_run(query);
}
}
void insertIntoQueryHandler(std::shared_ptr<Query> & query)
{
{
std::unique_lock<std::mutex> lock(queryHandlerMutex);
// Insert into Query Handler
queryHandler.emplace(query);
}
// Notify query handler to start if locked on empty
queryHandlerCv.notify_one();
}
std::shared_ptr<Answer>
ask(std::shared_ptr<Query> query, uint64_t timeoutMicros=0)
{
std::shared_ptr<Answer> answer = nullptr;
// Send Query to be handled by external thread
insertIntoQueryHandler(query);
// Hold for the answer to be returned with timeout period
answer = query->getAnswerWithTimeout(timeoutMicros);
return answer;
}
int main()
{
// Start Up Query Handler thread to handle Queries
std::thread queryHandlerThread(handleQueryHandler);
// Create queries in infinite loop and process
for(int i = 0; i < 1000000; i++)
{
auto question = std::make_shared<Question>();
auto query = std::make_shared<Query>();
query->setQuestion(question);
auto answer = ask(query, 1000);
if(!answer)
{
std::cout << "Query Timed out after 1000us" << std::endl;
}
}
// Stop the thread
{
std::unique_lock<std::mutex> lock(queryHandlerMutex);
keepRunning = false;
}
queryHandlerCv.notify_one();
queryHandlerThread.join();
return 0;
}