I'm trying to implement cooperative function timeout cancellation in C++, following recommendations in this answer. The idea is simple: if executing in separate thread function exceedes its allowed time limit, it terminates without writing to shared resource. Since its thread wouln't be used anymore, it gets detached.
Here's the code:
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <thread>
class Inner
{
public:
Inner()
{
std::this_thread::sleep_for(std::chrono::milliseconds(150));
};
~Inner(){};
};
static std::unique_ptr<std::runtime_error>
func(std::stop_token stoken, std::shared_ptr<Inner> obj)
{
try {
std::shared_ptr<Inner> new_obj(new Inner());
if (!stoken.stop_requested()) {
obj.swap(new_obj);
}
return std::unique_ptr<std::runtime_error>(nullptr);
} catch (const std::runtime_error &err) {
return std::unique_ptr<std::runtime_error>(new std::runtime_error(err.what()));
}
}
class Wrapper
{
private:
std::shared_ptr<Inner> obj = nullptr;
public:
Wrapper(){};
~Wrapper(){};
void
cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
{
std::packaged_task<std::unique_ptr<std::runtime_error>(std::stop_token, std::shared_ptr<Inner>)> task(
func);
auto result = task.get_future();
std::jthread thr(std::ref(task), obj);
if (result.wait_for(timeout) == std::future_status::ready) {
auto error = result.get();
if (error != nullptr)
throw *error;
} else {
thr.request_stop();
#ifdef DETACH
thr.detach();
#else
thr.join();
#endif
throw std::runtime_error("motor creation timeout");
}
}
};
int
main(void)
{
auto cls = std::shared_ptr<Wrapper>(new Wrapper());
try {
cls->cancellable_function(std::chrono::milliseconds(100));
assert(false);
} catch (const std::exception &) {
assert(true);
}
// wait for detached thread to finish
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
In this code construction of the Inner
object gets timed out. When it does, thread thr
receives stop signal via stoken
and leaves without modifying shared resouce obj
.
When built with join
instead of detach
, this code does not raise vargrind errors:
g++ -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
However, the version with detach
shows a lot of memory access errors (cannot insert all due to question size limitation):
g++ -D DETACH -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
==95200== Thread 2:
==95200== Invalid write of size 8
==95200== at 0x110BEA: std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >::_Tuple_impl(std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x110C12: std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >::tuple(std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x110C41: std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >::__uniq_ptr_impl(std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x10F58A: std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>::__uniq_ptr_data(std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>&&) (in /tmp/cancel_function)
==95200== by 0x10F5B0: std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >::unique_ptr(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x1157E5: std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::_M_set(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x115484: std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::operator()() const (in /tmp/cancel_function)
==95200== by 0x115271: std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter> std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&>(std::__invoke_other, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&) (in /tmp/cancel_function)
...
I cannot understand, why the detached thread accessing memory after being cancelled. How could this code be fixed without using join
?