0

I'm trying to implement cooperative function timeout cancellation in C++, following recommendations in this answer. The idea is simple: if executing in separate thread function exceedes its allowed time limit, it terminates without writing to shared resource. Since its thread wouln't be used anymore, it gets detached.

Here's the code:

#include <cassert>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <thread>

class Inner
{
      public:
    Inner()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    };
    ~Inner(){};
};

static std::unique_ptr<std::runtime_error>
func(std::stop_token stoken, std::shared_ptr<Inner> obj)
{
    try {
        std::shared_ptr<Inner> new_obj(new Inner());
        if (!stoken.stop_requested()) {
            obj.swap(new_obj);
        }
        return std::unique_ptr<std::runtime_error>(nullptr);
    } catch (const std::runtime_error &err) {
        return std::unique_ptr<std::runtime_error>(new std::runtime_error(err.what()));
    }
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        std::packaged_task<std::unique_ptr<std::runtime_error>(std::stop_token, std::shared_ptr<Inner>)> task(
            func);

        auto         result = task.get_future();
        std::jthread thr(std::ref(task), obj);

        if (result.wait_for(timeout) == std::future_status::ready) {
            auto error = result.get();
            if (error != nullptr)
                throw *error;
        } else {
            thr.request_stop();
#ifdef DETACH
            thr.detach();
#else
            thr.join();
#endif
            throw std::runtime_error("motor creation timeout");
        }
    }
};

int
main(void)
{
    auto cls = std::shared_ptr<Wrapper>(new Wrapper());
    try {
        cls->cancellable_function(std::chrono::milliseconds(100));
        assert(false);
    } catch (const std::exception &) {
        assert(true);
    }

    // wait for detached thread to finish
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

In this code construction of the Inner object gets timed out. When it does, thread thr receives stop signal via stoken and leaves without modifying shared resouce obj.

When built with join instead of detach, this code does not raise vargrind errors:

g++ -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function

However, the version with detach shows a lot of memory access errors (cannot insert all due to question size limitation):

g++ -D DETACH -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
==95200== Thread 2:
==95200== Invalid write of size 8
==95200==    at 0x110BEA: std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >::_Tuple_impl(std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C12: std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >::tuple(std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C41: std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >::__uniq_ptr_impl(std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x10F58A: std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>::__uniq_ptr_data(std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>&&) (in /tmp/cancel_function)
==95200==    by 0x10F5B0: std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >::unique_ptr(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x1157E5: std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::_M_set(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x115484: std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::operator()() const (in /tmp/cancel_function)
==95200==    by 0x115271: std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter> std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&>(std::__invoke_other, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&) (in /tmp/cancel_function)
...

I cannot understand, why the detached thread accessing memory after being cancelled. How could this code be fixed without using join?

Sergey
  • 1,166
  • 14
  • 27
  • Unrelated: Why are you putting a `;` after the `}` in function definitions? (and I agree with the comment right below - detaching threads is usually not a good idea) – Ted Lyngmo Jun 30 '23 at 13:04
  • 2
    Detaching threads is a sure way to shoot yourself in the foot. You loose all ability to synchronize at shutdown. So main is deleting all kind of stuff at (and after) exit while your thread "happily" keeps accessing data that's no longer there. So NEVER use detach. Always use join (or use std::acync/std::future, or std::jthread) and let your threads cooperatively shutdown at program exit – Pepijn Kramer Jun 30 '23 at 13:04
  • 1
    The problem is the `task` variable, because it's a local variable in your function, and that you can't destroy the object at the end of the function if the thread still accesses it. (And `std::packaged_task` does access the object itself after your routine has completed in order to store the result in the promise/future.) You have to somehow ensure that the `task` variable survives the end of your function if you want to detach the thread. – chris_se Jun 30 '23 at 13:09
  • 1
    Also, your `.swap()` doesn't actually do what you think it does... – chris_se Jun 30 '23 at 13:10
  • @PepijnKramer Well, actually synchronising at shutdown is still possible, but one needs to go the hard way of doing all of by hand. `join`ing is just so much more convenient and *safer*… – Aconcagua Jun 30 '23 at 13:25
  • @Aconcagua I know you can. But OP is obviously just starting with threads. So I didn't want to show things like std::condition_variable just yet ;) – Pepijn Kramer Jun 30 '23 at 13:29
  • Your `cancellable_function` _waits_ for the task to be completed or, for the timer to expire. Meanwhile, the `func` cannot possibly be stopped from doing whatever when the timeout expires unless it periodically checks the `stop_requested()`. Why not simply have `cancellable_function` do the work, and have it periodically check whether or not it has taken too long? What's the point of creating a new thread if the function that created it does not do something else concurrently with the new thread? – Solomon Slow Jun 30 '23 at 14:40

2 Answers2

2

The problem is that the task variable is only passed in as a reference to the std::jthread, so that when you detach it, and the function ends, the variable will fall out of scope and the underlying object will be deleted. However, once the function finally ends in the detached thread it will still try to access the task variable you passed in as a reference, but the underlying object has already been deleted.

What you can do is move the packaged_task into the std::jthread so that it will only be destroyed once the actual thread ends:

    std::jthread thr(std::move(task), obj);

Then detaching will work.

However: there are some issues in your code that don't make much sense:

  • You are using std::unique_ptr<std::runtime_error> -- why? There's already std::exception_ptr that you can use if you want to wrap exceptions. Also, why use it explicitly anyway? std::future can hold exceptions just fine, why the explicit handling here?

  • .swap() doesn't do what you want: it will swap the local function variable for the local function parameter - and since both are local variables, this will have effectively no effect... You could of course swap the contents of the shared pointers (std::swap(*obj, *new_obj);), but that's not adviseable because:

  • Your abort check has an inherent race condition. What if in the thread you started the check notices that it wasn't aborted, then the main thread aborts at that point, and then the swap happens. That's bad.

I think a much cleaner (and simpler) way of doing this is simply relying on std::future to return the value that you wanted to create and then use that value. If an error occurs, std::future will also forward any exceptions to you. That'll make your code much simpler:

static std::shared_ptr<Inner>
func(std::stop_token token)
{
#if THROW_EXCEPTION
    throw std::runtime_error("Some error happened");
#endif
    std::shared_ptr<Inner> new_obj(new Inner());
    if (token.stop_requested())
        throw std::runtime_error("Aborted");
    return new_obj;
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        std::packaged_task<std::shared_ptr<Inner>(std::stop_token)> task(func);

        auto result = task.get_future();
        std::jthread thr(std::move(task));

        if (result.wait_for(timeout) == std::future_status::ready) {
            // result.get() will throw if there was an exception
            // in the code
            obj = result.get();
        } else {
            thr.request_stop();
#ifdef DETACH
            thr.detach();
#else
            thr.join();
#endif
            throw std::runtime_error("motor creation timeout");
        }
    }
};

However, if you can't really interrupt the operation at all in the middle and can only check at the very end if the stop_token was used or not, why bother with this complication? Just use std::async:

static std::shared_ptr<Inner>
func()
{
#if THROW_EXCEPTION
    throw std::runtime_error("Some error happened");
#endif
    std::shared_ptr<Inner> new_obj(new Inner());
    return new_obj;
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        auto result = std::async(func);
        if (result.wait_for(timeout) == std::future_status::ready) {
            // result.get() will throw if there was an exception
            // in the code
            obj = result.get();
        } else {
            throw std::runtime_error("motor creation timeout");
        }
    }
};

Of course if you do have possible cancellation points in your routine that can be checked periodically, then using the jthread + packaged_task variant is probably better (because as far as I know std::async doesn't have any cancellation support).

chris_se
  • 1,006
  • 1
  • 7
0

(In principle): Never use detach, always make sure threads can cooperatively stop their work at shutdown. And then synchronize with the stopping of those threads. e.g std::thread::join(), std::jthread::~jthread() (C++20), or a combination of std::future/std::async

Pepijn Kramer
  • 9,356
  • 2
  • 8
  • 19
  • True, but join will wait for the thread to finish what it's doing, so function potentially could block indefinitely. – Sergey Jun 30 '23 at 14:19
  • @Sergey Hence my comment on cooperatively stop. The thread should be send a signal to stop and the thread should do so the sleep should be replaced with a timed wait on a condition variable and return early when requested to do so. It is IMO the only way to build a stable system (nobody said threading was going to be a free lunch) – Pepijn Kramer Jun 30 '23 at 14:22