3

In the following exampl, mEventExecutors is a std::vector<std::future<void>>. I would like to be able to remove the futures from the vector, as they complete. Can this be done?

void RaiseEvent(EventID messageID)
{
    mEventExecutors.push_back(std::move(std::async([=]{
            auto eventObject = mEventListeners.find(messageID);
            if (eventObject != mEventListeners.end())
            {
                for (auto listener : eventObject->second)
                {
                    listener();
                }
            }
        })
    ));
}
Rakete1111
  • 47,013
  • 16
  • 123
  • 162
tuskcode
  • 317
  • 2
  • 14
  • 5
    FYI: You don't actually need to `std::move` there, the return value is already a prvalue. – Rakete1111 Jul 15 '17 at 11:06
  • Are you sure? cppreference doesn't state this that std::async returns rvalue. In fact, I would have assumed that the following statement would suggest that it does need to be moved. _If the std::future obtained from std::async is not moved from or bound to a reference, the destructor of the std::future will block at the end of the full expression until the asynchronous operation completes, essentially making code such as the following synchronous_ – tuskcode Jul 15 '17 at 11:18
  • Yes. `std::move` doesn't actually move anything (confusing, right?). It just casts the passed value to a xvalue, which enables move semantics for it. `std::async` returns a simple `std::future<...>` if you look at the signature. It's not a reference, which would be an lvalue, but it is a temporary, and so `std::move` is unnecessary (why cast a rvalue to an rvalue?). – Rakete1111 Jul 15 '17 at 11:22
  • So by default, temporaries will be be moved? Is this part of RVO? – tuskcode Jul 15 '17 at 11:23
  • 2
    Yes. No, RVO and NRVO and optimization techniques used by the compiler (which are allowed by the Standard). You might want to read [this question](https://stackoverflow.com/questions/17473753/c11-return-value-optimization-or-move). – Rakete1111 Jul 15 '17 at 11:25
  • Cool - thanks for the link. I will be more calculated with my application of explicit move operation! – tuskcode Jul 15 '17 at 11:29
  • 2
    The above code appears to be full of synchronization bugs. You access `mEventListeners` in a way that would be basically impossible to synchronize. One way to treat threads in C++ is "assume nothing is done -- the task is deferred -- until you wait on it"; is your program still correct? When do you *need* the listeners to be notified, ever? If not, you could write code that self-deregisters using at thread exit or somesuch and a thread clraning up dead threads and some synchronizatiom. – Yakk - Adam Nevraumont Jul 15 '17 at 11:35

2 Answers2

2

The question itself has been answer by another, but it piqued my curiosity as to how one could implement a fully functional, thread-safe task manager in a minimum number of code lines.

I also wondered whether it would be possible to wait on the tasks as futures, or optionally provide a callback function.

Then of course this begged the question whether those futures could use the sexy continuation syntax of .then(xxx) rather than blocking the code.

Here is my attempt.

Much kudos to Christopher Kohlhoff, the author of boost::asio. By studying his awesome work, I learned the value of separating classes into:

  • handle - controls the lifetime of the object
  • service - provides object logic, state shared amongst object impls, and manages the lifetimes of implementation objects should they outlive the handle (anything that relies on a callback usually does), and
  • implementation provides per-object state.

So here's an example of calling the code:

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([]
               {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch(std::exception const& e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}

Which would result in the following output:

task 1 is doing something
task 2 doing something
task 3 doing something
task 3 is done
task 1 done
task 1 completed
task 2 is going to throw
task 2 threw: here is an error

And here's the complete code (tested on apple clang which is more promiscuous than gcc, so if i've missed a this-> in a lambda, my apologies):

#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_EXECUTORS 1

/* written by Richard Hodges 2017
 * You're free to use the code, but please give credit where it's due :)
 */
#include <boost/thread/future.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <thread>
#include <utility>
#include <unordered_map>
#include <stdexcept>
#include <condition_variable>

// I made a task an object because I thought I might want to store state in it.
// it turns out that this is not strictly necessary

struct task {

};

/*
 * This is the implementation data for one task_manager
 */
struct task_manager_impl {

    using mutex_type = std::mutex;
    using lock_type = std::unique_lock<mutex_type>;

    auto get_lock() -> lock_type {
        return lock_type(mutex_);
    }

    auto add_task(lock_type const &lock, std::unique_ptr<task> t) {
        auto id = t.get();
        task_map_.emplace(id, std::move(t));
    }

    auto remove_task(lock_type lock, task *task_id) {
        task_map_.erase(task_id);
        if (task_map_.empty()) {
            lock.unlock();
            no_more_tasks_.notify_all();
        }
    }

    auto wait(lock_type lock) {
        no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); });
    }

    // for this example I have chosen to express errors as exceptions
    using error_type = std::exception_ptr;

    mutex_type mutex_;
    std::condition_variable no_more_tasks_;


    std::unordered_map<task *, std::unique_ptr<task>> task_map_;
};

/*
 * This stuff is the protocol to figure out whether to return a future
 * or just invoke a callback.
 * Total respect to Christopher Kohlhoff of asio fame for figuring this out
 * I merely step in his footsteps here, with some simplifications because of c++11
 */
struct use_future_t {
};
constexpr auto use_future = use_future_t();

template<class Handler>
struct make_async_handler {
    auto wrap(Handler handler) {
        return handler;
    }

    struct result_type {
        auto get() -> void {}
    };

    struct result_type result;
};

template<>
struct make_async_handler<const use_future_t &> {
    struct shared_state_type {
        boost::promise<void> promise;
    };

    make_async_handler() {
    }

    template<class Handler>
    auto wrap(Handler &&) {
        return [shared_state = this->shared_state](auto error) {
            // boost promises deal in terms of boost::exception_ptr so we need to marshal.
            // this is a small price to pay for the extra utility of boost::promise over
            // std::promise
            if (error) {
                try {
                    std::rethrow_exception(error);
                }
                catch (...) {
                    shared_state->promise.set_exception(boost::current_exception());
                }
            } else {
                shared_state->promise.set_value();
            }
        };
    }


    struct result_type {
        auto get() -> boost::future<void> { return shared_state->promise.get_future(); }

        std::shared_ptr<shared_state_type> shared_state;
    };

    std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>();
    result_type result{shared_state};

};

/*
 * Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool
 * The destructor of a basic_thread_pool will not complete until all tasks are complete. So our
 * program will not crash horribly at exit time.
 */
struct task_manager_service {

    /*
     * through this function, the service has full control over how it is created and destroyed.
     */

    static auto use() -> task_manager_service&
    {
        static task_manager_service me {};
        return me;
    }

    using impl_class = task_manager_impl;

    struct deleter {
        void operator()(impl_class *p) {
            service_->destroy(p);
        }

        task_manager_service *service_;
    };

    /*
     * defining impl_type in terms of a unique_ptr ensures that the handle will be
     * moveable but not copyable.
     * Had we used a shared_ptr, the handle would be copyable with shared semantics.
     * That can be useful too.
     */
    using impl_type = std::unique_ptr<impl_class, deleter>;

    auto construct() -> impl_type {
        return impl_type(new impl_class(),
                         deleter {this});
    }

    auto destroy(impl_class *impl) -> void {
        wait(*impl);
        delete impl;
    }

    template<class Job, class Handler>
    auto submit(impl_class &impl, Job &&job, Handler &&handler) {

        auto make_handler = make_async_handler<Handler>();


        auto async_handler = make_handler.wrap(std::forward<Handler>(handler));

        auto my_task = std::make_unique<task>();
        auto task_ptr = my_task.get();

        auto task_done = [
                this,
                task_id = task_ptr,
                &impl,
                async_handler
        ](auto error) {
            async_handler(error);
            this->remove_task(impl, task_id);
        };
        auto lock = impl.get_lock();
        impl.add_task(lock, std::move(my_task));
        launch(impl, task_ptr, std::forward<Job>(job), task_done);

        return make_handler.result.get();
    };

    template<class F, class Handler>
    auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void {
        this->thread_pool_.submit([f, handler] {
            auto error = std::exception_ptr();
            try {
                f();
            }
            catch (...) {
                error = std::current_exception();
            }
            handler(error);
        });
    }


    auto wait(impl_class &impl) -> void {
        impl.wait(impl.get_lock());
    }

    auto remove_task(impl_class &impl, task *task_id) -> void {
        impl.remove_task(impl.get_lock(), task_id);
    }


    boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()};

};

/*
 * The task manage handle. Holds the task_manager implementation plus provides access to the
 * owning task_manager_service. In this case, the service is a global static object. In an io loop environment
 * for example, asio, the service would be owned by the io loop.
 */
struct task_manager {

    using service_type = task_manager_service;
    using impl_type = service_type::impl_type;
    using impl_class = decltype(*std::declval<impl_type>());

    task_manager()
            : service_(std::addressof(service_type::use()))
            , impl_(get_service().construct()) {}

    template<class Job, class Handler>
    auto submit(Job &&job, Handler &&handler) {
        return get_service().submit(get_impl(),
                                    std::forward<Job>(job),
                                    std::forward<Handler>(handler));
    }

    auto get_service() -> service_type & {
        return *service_;
    }

    auto get_impl() -> impl_class & {
        return *impl_;
    }

private:

    service_type* service_;
    impl_type impl_;
};


/*
 * helpful thread-safe emitter
 */
std::mutex thing_mutex;

template<class...Things>
void emit(Things &&...things) {
    auto lock = std::unique_lock<std::mutex>(thing_mutex);
    using expand = int[];
    void(expand{0,
                ((std::cout << things), 0)...
    });
    std::cout << std::endl;
}

using namespace std::literals;

int main() {
    task_manager mgr;

    // an example of using async callbacks to indicate completion and error
    mgr.submit([] {
                   emit("task 1 is doing something");
                   std::this_thread::sleep_for(1s);
                   emit("task 1 done");
               },
               [](auto err) {
                   if (not err) {
                       emit("task 1 completed");
                   } else {
                       emit("task 1 failed");
                   }
               });

    // an example of returning a future (see later)
    auto f = mgr.submit([] {
        emit("task 2 doing something");
        std::this_thread::sleep_for(1500ms);
        emit("task 2 is going to throw");
        throw std::runtime_error("here is an error");
    }, use_future);

    // an example of returning a future and then immediately using its continuation.
    // note that the continuation happens on the task_manager's thread pool
    mgr.submit([] {
                   emit("task 3 doing something");
                   std::this_thread::sleep_for(500ms);
                   emit("task 3 is done");
               },
               use_future)
            .then([](auto f) {
                try {
                    f.get();
                }
                catch (std::exception const &e) {
                    emit("task 3 threw an exception: ", e.what());
                }
            });

    // block on the future of the second example
    try {
        f.get();
    }
    catch (std::exception &e) {
        emit("task 2 threw: ", e.what());
    }
}
Richard Hodges
  • 68,278
  • 7
  • 90
  • 142
  • Wow. That's a lot of code. I still have to give it due attention. Are you sure the handle/impl separation isn't the hammer, and there are many nails? I'm a bit skeptical from the outset. I've seen too many patterns that are simply glorified versions of "add a layer of indirection". I'll be evaluating this to find the essence of it, for sure. Maybe I'll see the light and post back when I do! – sehe Jul 15 '17 at 19:52
  • @sehe one could of course, coagulate all this logic into one (or two) classes, but then you'd lose the separation of concerns over the interface and the imlpementation. This way allows us to write one (templated) handle interface which defers to one (templated) service, which allows us to implement various types of copyability/asynchronosity with no code changes other than perhaps a policy class to specialise the service. I use this technique to provide the same interface regardless of the underlying OS details (such as iOS, posix, emscripten, et. al). – Richard Hodges Jul 15 '17 at 21:00
1

Don't use std::async is the easy solution in my opinion, and use std::thread instead.

You need to be careful though, your code currently has a lot of data races. Consider using another mutex or some other technique to prevent them.

std::thread{[=]() {
    // Task is running...
    auto eventObject = mEventListeners.find(messageID);
    if (eventObject != mEventListeners.end())
    {
        for (auto listener : eventObject->second)
        {
            listener();
        }
    }
}.detach(); // detach thread so that it continues
Rakete1111
  • 47,013
  • 16
  • 123
  • 162
  • So by using thread.detach, it doesn't matter that the temporary thread value goes out of scope? The only reason I was pushing futures on a vector was to keep the runnable 'in scope' until all the event handlers had completed for the triggered event. – tuskcode Jul 15 '17 at 12:43
  • @Pat Exactly, the execution thread is detached from the `std::thread` instance. – Rakete1111 Jul 15 '17 at 12:44
  • Great, I'll test it out and see how it goes. Will get back to you shortly – tuskcode Jul 15 '17 at 12:46
  • I don't even really need the counting, so I haven't used that element of the code. I have used added a lock_guard to synchronize access to the mEventListeners object. Can you foresee any other races? – tuskcode Jul 15 '17 at 12:59
  • @Pat Every time you access `mEventListeners` you need to lock the mutex (even if it's outside of the threads), and `listener()` also needs to be thread safe. – Rakete1111 Jul 15 '17 at 13:00
  • @Pat As pointed out in the comments you have some race issues. It is not enough just to lock access to `mEventListeners` because each entry returns a list (vector?) of listeners. Any access to that list **also** needs to be mutex controlled. So you probably need one mutex per list of listeners (per map entry) in *addition* to locking the map itself when finding the entry. Otherwise you have to lock the whole map for the whole time you call the listeners. Then you can get away with just one mutex for the whole map. – Galik Jul 15 '17 at 13:18
  • @Galik I am waiting on Rakete1111 to approve my edits with my solution to the data races. If access to the mEventListeners is synchronized, I don't a unique synchronization mechanism for each of my listener lists. – tuskcode Jul 15 '17 at 13:23
  • @Pat You can use just one mutex for the whole map. But then you have to keep the whole map locked while you iterate through each listener. That effectively forces your threads to run consecutively rather than in parallel. – Galik Jul 15 '17 at 13:25
  • @Pat If you have `C++14` you could differentiate between read and write locks which would likely make that approach more efficient. – Galik Jul 15 '17 at 13:27
  • @Galik, I looked at what I thought was going to work, but I am concerned about my use of the iterator that if returned from mEventListeners.find(messageID). If I try to limit the scope of first lock to allow other thread to access mEventListeners, I risk my iterator becoming invalid. Any advice on what to do? – tuskcode Jul 15 '17 at 14:15
  • @Pat As long as you don't modify the vector (technically, if you don't cause it to reallocate), the iterators will always be valid. – Rakete1111 Jul 15 '17 at 14:47
  • @Rakete1111 Yes it has become apparent that this is probably the biggest flaw with my design. I don't think I can make such a strong guarantee. Are there any container that would provide this safety? I should probably do some benchmarks to see whether the mEventListeners lock_guard limits the performance enough for me to require a better method. – tuskcode Jul 15 '17 at 14:54
  • @Pat `std::list`, but it has other shortcomings, like slow lookup. – Rakete1111 Jul 15 '17 at 14:55
  • Yeah and after a number of additions and removals, does the memory layout of the list become fragmented? – tuskcode Jul 15 '17 at 14:57
  • @Pat May very well be. That guarantee is only with `std::vector` I think. `std::array` probably too, but it's not dynamic. – Rakete1111 Jul 15 '17 at 14:57
  • @Pat This is a whole other question in itself. It depends so much on the dynamics. If these events are *high frequency* then creating one `thread` per event may be excessive in itself. I might consider just one thread to run the whole event system - firing all the events (one after another) and not bother with an individual thread for each event. At the very least use a thread pool. – Galik Jul 15 '17 at 15:10
  • @Pat Otherwise You could take a **copy** of your list of listeners before releasing the lock on the map, that way you don't have to worry about the ones in the map. You can safely iterate through your **copy** without synchronization problems. In that case it would be possible for a listener to receive an event after it has removed itself from the list of listeners. As long as you can tolerate that. – Galik Jul 15 '17 at 15:10
  • @Rakete1111 In this case you cannot join the detached thread, right? You may want to join all spawned threads, e.g. on app exit. – synther Aug 12 '18 at 12:25