SHORT ANSWERS
- Why does the program stop before all completion handlers have run?
I have no direct idea, your own Godbolt link seems to the premise, and so does this slightly embellished example: https://godbolt.org/z/WMKa4sqaE
See below for some notes about the changes.
- Is this code violating some asio principle?
Maybe. See below.
It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.
Yes. The docs have a very similar example: "To see this in practice, let's use a detached thread to adapt a synchronous operation into an asynchronous one"
- (bonus) I want to swap std::thread with concurrency::task. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t>(move(self)), but this caused the three handlers to just print str: without the args. I believe this has something to do with the fact that the Self type (really a asio::detail::compose_op) contains a moved-in copy of the impl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?
Beast has some helpers in their code base (stable_operation_base or something,
from the top of my head). Also see this blog post by Richard
Hodges
which creates a shared_composed_op
from a composed_op
that afford reference
stability of the standard operation implementation.
LONG ANSWERS
Yes. Resumed coroutines are not work - it's only when they suspend they usually enqueue a an operation with a completion handler to resume.
This is already the case with non-c++20 stackful coros, as Tanner has made very explicitly clear on occasion:
While spawn()
adds work to the io_service
(a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent the io_service
event loop from ending while a coroutine is outstanding, it may be necessary to add work to the io_service
before yielding.
What's worse: when you interact with more than one IO object they may be associated with different execution context, so you might need to track work on multiple executors.
The good news is that Asio (Chris) knew about this, which is why the signature of async_compose
takes a list of IoObjectsOrExecutors
:
template<
typename CompletionToken,
typename Signature,
typename Implementation,
typename... IoObjectsOrExecutors>
DEDUCED async_compose(
Implementation && implementation,
CompletionToken & token,
IoObjectsOrExecutors &&... io_objects_or_executors);
io_objects_or_executors
Zero or more I/O objects or I/O executors for which outstanding work must be maintained.
The composed operation specialized on your callable type will effective use boost::asio::prefer(ex, execution::outstanding_work.tracked)
on all of the associated executors.
So as long as the composed operation (self
) stays around, there should be work.
Services Are Not IO Objects Or Executors
You pass the service ("execution context") itself instead of an executor. When passing executors, prefer to pass by value.
Then What Went Wrong?
Again, I don't really know as I didn't exactly reproduce your complaints.
However, keep in mind the semantics of completion. In simplified pseudo-code, complete()
does:
void complete() {
work_.reset();
handler_();
}
In other words, don't expect the work guards to stick past completion. In fact the order is pretty central to the allocation guarantees of the library.
(More) Reliable Debug Output
In C++ use std::flush
(or std::endl
) if you want output to appear.
Otherwise you might just be confused about output timing. This is frequently a
source of confusion when printing stuff from completion handlers in Asio.
For maximum insight, I'll introduce a variadic trace function that also timestamps each trace:
namespace {
using std::this_thread::sleep_for;
static auto now = std::chrono::steady_clock::now;
static auto start = now();
static std::mutex trace_mx;
static void trace(auto const&... args) {
std::unique_lock lk(trace_mx);
((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
}
} // namespace
Side Note use_future
I don't get what you tried to achieve with the std::async
version. As it stands you're demonstrating why std::async
has been a bad design.
If you are looking to demonstrate Asio's future support, I'd write:
auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
try {
fut.get();
std::cout << "future done" << std::endl;
} catch (std::exception const& e) {
std::cout << "future error: " << e.what() << std::endl;
}
Now, to avoid interfering with the service because the future will block, I'd suggest running the service in the background instead:
asio::thread_pool ctx{1};
Of course, you can invert the situation by introducing a thread for the blocking wait:
std::thread ft{[ex] {
auto fut = async_task(ex, asio::use_future, slow_print, "future");
try {
fut.get();
std::cout << "future done" << std::endl;
} catch (std::exception const& e) {
std::cout << "future error: " << e.what() << std::endl;
}
}};
ctx.run();
ft.join();
Double Moves
In your task implementation, you both move self
and copy *this
. However
compose_op
aggregates your async_task_impl
(as the impl_
member), so
there is a timing link between those. As far as I know the evaluation order or
lambda captures in unspecified.
I'd suggest avoiding the unnecessary copy:
std::thread([self = std::move(self)]() mutable {
auto& me = self.impl_;
try {
std::apply(me.fn_, me.args_);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}).detach();
Or indeed, going for syntactic sugar:
std::thread([self = std::move(self)]() mutable {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}).detach();
To make it even more elegant, just pass the self
as a mutable argument
instead of capturing it (this may not work with concurrency::create_task
of
course):
std::thread([](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}, std::move(self)).detach();
Perfect Storage vs. Perfect Forwarding
Another place where you are not 100% clear about the forwarding intent is in the async_task_impl
constructor. Args...
is already in non-deduced context there, so Args&&...
mandates rvalues. This might be why you used ""s
-literals?
There are several ways to fix
Either you can let the compiler do its job:
async_task_impl(Fn&& fn, Args... args)
: fn_(std::forward<Fn>(fn))
, args_(std::move(args)...) {}
If you feel that's a pessimization (does your code-base use expensive non-move-aware argument types?), the simplest is to make the construct an independent template:
template <typename Fn2, typename... Args2>
async_task_impl(Fn2&& fn, Args2&&... args)
: fn_(std::forward<Fn2>(fn))
, args_(std::forward<Args2>(args)...) {}
I would probably go all-the-way and be explicit about the decay moment
using a deduction guide. The best part is you no longer require a
constructor at all:
template <typename Fn, typename... Args> struct async_task_impl {
Fn fn_;
std::tuple<Args...> args_;
auto operator()(auto& self) const {
// @todo: use concurrency::create_task
std::thread(
[](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}, std::move(self)).detach();
}
};
template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
// runs some blocking task on the windows thread pool and wraps it in a nice
// asio wrapper
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
}
Full Demo
Combining all the above:
Live On Coliru
Live On Compiler Explorer
#include "boost/asio.hpp"
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
namespace {
using std::this_thread::sleep_for;
static auto now = std::chrono::steady_clock::now;
static auto start = now();
static std::mutex trace_mx;
static void trace(auto const&... args) {
std::unique_lock lk(trace_mx);
((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
}
} // namespace
template <typename Fn, typename... Args> struct async_task_impl {
Fn fn_;
std::tuple<Args...> args_;
auto operator()(auto& self) const {
// @todo: use concurrency::create_task
std::thread(
[](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
},
std::move(self))
.detach();
}
};
template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
// wrap blocking task in an asio wrapper
namespace asio = boost::asio;
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
}
void slow_print(std::string str) {
sleep_for(500ms);
trace("slow_print: ", str);
sleep_for(500ms);
}
asio::awaitable<void> my_coro() {
auto ex = co_await asio::this_coro::executor;
co_await async_task(ex, asio::use_awaitable, slow_print, "coro");
trace("coro done");
}
void run_tests(auto ex) {
async_task(
ex, [](std::exception_ptr) { trace("callback done"); }, slow_print, "callback");
asio::co_spawn(ex, my_coro(), asio::detached);
std::thread ft{[ex] {
auto fut = async_task(ex, asio::use_future, slow_print, "future");
fut.get();
trace("future done");
}};
ft.join();
}
int main() try {
{
trace("Starting ctx1");
asio::io_context ctx1;
run_tests(ctx1.get_executor());
trace("Waiting ctx1");
ctx1.run();
trace("Done ctx1");
}
trace("----\n");
{
trace("Starting ctx2");
asio::thread_pool ctx2{1};
run_tests(ctx2.get_executor());
trace("Waiting ctx2");
ctx2.join();
trace("Done ctx2");
}
sleep_for(2s);
trace("Bye");
} catch (std::exception const& e) {
trace(e.what());
}
Prints
at 0ms Starting ctx1
at 500ms slow_print: callback
at 500ms slow_print: future
at 1000ms callback done
at 1000ms future done
at 1000ms Waiting ctx1
at 1501ms slow_print: coro
at 2001ms coro done
at 2001ms Done ctx1
at 2001ms ----
at 2001ms Starting ctx2
at 2501ms slow_print: callback
at 2501ms slow_print: future
at 2502ms slow_print: coro
at 3001ms callback done
at 3002ms future done
at 3002ms coro done
at 3002ms Waiting ctx2
at 3002ms Done ctx2
at 5002ms Bye