1

I am trying to write a general async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) async initiating function.

The goal is to wrap arbitrary, blocking, third-party functions in a thread, and provide an asio-based interface.

It's not perfect yet (for instance, I know I need to post the completion handlers on the executor instead of running them in the thread), but I feel quite close.

I have three issues and questions:

  1. Why does the program stop before all completion handlers have run? I shouldn't need a work guard, since the async operation is ongoing, right? EDIT: I'm mistaken. The non-callback handlers aren't being called at all, as evidenced by putting a sleep_for(1s) after the run() call. So my question is instead, why not?

  2. Is this code violating some asio principle? It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.

  3. (bonus) I want to swap std::thread with concurrency::task<void>. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t<Self>>(move(self)), but this caused the three handlers to just print str: without the args. I believe this has something to do with the fact that the Self type (really a asio::detail::compose_op) contains a moved-in copy of the impl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?

#include <chrono>
#include <iostream>
#include <memory>
#include <thread>

#include "asio.hpp"

template <typename Fn, typename... Args>
struct async_task_impl {
  std::decay_t<Fn> fn_;
  std::tuple<std::decay_t<Args>...> args_;

  async_task_impl(Fn&& fn, Args&&... args)
      : fn_(std::forward<Fn>(fn)), args_(std::forward<Args>(args)...) {}

  template <typename Self>
  auto operator()(Self& self) {
    // @todo: use concurrency::create_task
    auto t =
        std::thread([me = *this,             // copy impl into thread
                     self = std::move(self)  // move composed_op into thread?
    ]() mutable {
          try {
            std::apply(me.fn_, me.args_);
            self.complete({});
          } catch (std::exception& e) {
            self.complete(std::current_exception());
          }
        });
    t.detach();
  }
};

// runs some blocking task on its own thread and wraps it in asio
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
  return asio::async_compose<Token, void(std::exception_ptr)>(
      async_task_impl(std::forward<Fn>(func), std::forward<Args>(args)...),
      token, executor);
}

Test code: Godbolt

void slow_print(std::string str) {
  static std::mutex m;
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
  {
    std::unique_lock lk(m);
    std::cout << "slow_print: " << str << "\n";
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

int main() {
  try {
    asio::io_context ctx;

    using namespace std::string_literals;

    async_task(
        ctx, [](std::exception_ptr) { std::cout << "callback done\n"; },
        slow_print, "callback"s);

    asio::co_spawn(
        ctx,
        [&]() -> asio::awaitable<void> {
          co_await async_task(ctx, asio::use_awaitable, slow_print, "coro"s);
        },
        asio::detached);

    auto f = std::async(std::launch::async, [&] {
      auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
      fut.get();
    });

    ctx.run();
  } catch (std::exception& e) {
    std::cout << e.what() << "\n";
  }
  return 0;
}
MHebes
  • 2,290
  • 1
  • 16
  • 29
  • Also not strictly part of the question, but is there some discord or chat or something for discussions about asio? – MHebes Jan 24 '23 at 07:06
  • 1
    cppslack has an asio and beast channel. TBF the beast channel is usually your pick – sehe Jan 24 '23 at 21:43

1 Answers1

2

SHORT ANSWERS

  1. Why does the program stop before all completion handlers have run?

I have no direct idea, your own Godbolt link seems to the premise, and so does this slightly embellished example: https://godbolt.org/z/WMKa4sqaE See below for some notes about the changes.

  1. Is this code violating some asio principle?

Maybe. See below.

It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.

Yes. The docs have a very similar example: "To see this in practice, let's use a detached thread to adapt a synchronous operation into an asynchronous one"

  1. (bonus) I want to swap std::thread with concurrency::task. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t>(move(self)), but this caused the three handlers to just print str: without the args. I believe this has something to do with the fact that the Self type (really a asio::detail::compose_op) contains a moved-in copy of the impl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?

Beast has some helpers in their code base (stable_operation_base or something, from the top of my head). Also see this blog post by Richard Hodges which creates a shared_composed_op from a composed_op that afford reference stability of the standard operation implementation.

LONG ANSWERS

Yes. Resumed coroutines are not work - it's only when they suspend they usually enqueue a an operation with a completion handler to resume.

This is already the case with non-c++20 stackful coros, as Tanner has made very explicitly clear on occasion:

While spawn() adds work to the io_service (a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent the io_service event loop from ending while a coroutine is outstanding, it may be necessary to add work to the io_service before yielding.

What's worse: when you interact with more than one IO object they may be associated with different execution context, so you might need to track work on multiple executors.

The good news is that Asio (Chris) knew about this, which is why the signature of async_compose takes a list of IoObjectsOrExecutors:

template<
    typename CompletionToken,
    typename Signature,
    typename Implementation,
    typename... IoObjectsOrExecutors>
DEDUCED async_compose(
    Implementation && implementation,
    CompletionToken & token,
    IoObjectsOrExecutors &&... io_objects_or_executors);
  • io_objects_or_executors Zero or more I/O objects or I/O executors for which outstanding work must be maintained.

The composed operation specialized on your callable type will effective use boost::asio::prefer(ex, execution::outstanding_work.tracked) on all of the associated executors.

So as long as the composed operation (self) stays around, there should be work.

Services Are Not IO Objects Or Executors

You pass the service ("execution context") itself instead of an executor. When passing executors, prefer to pass by value.

Then What Went Wrong?

Again, I don't really know as I didn't exactly reproduce your complaints.

However, keep in mind the semantics of completion. In simplified pseudo-code, complete() does:

void complete() {
    work_.reset();
    handler_();
}

In other words, don't expect the work guards to stick past completion. In fact the order is pretty central to the allocation guarantees of the library.

(More) Reliable Debug Output

In C++ use std::flush (or std::endl) if you want output to appear. Otherwise you might just be confused about output timing. This is frequently a source of confusion when printing stuff from completion handlers in Asio.

For maximum insight, I'll introduce a variadic trace function that also timestamps each trace:

namespace {
    using std::this_thread::sleep_for;
    static auto now   = std::chrono::steady_clock::now;
    static auto start = now();
    static std::mutex trace_mx;

    static void trace(auto const&... args) {
        std::unique_lock lk(trace_mx);
        ((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
    }
} // namespace

Side Note use_future

I don't get what you tried to achieve with the std::async version. As it stands you're demonstrating why std::async has been a bad design.

If you are looking to demonstrate Asio's future support, I'd write:

auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
try {
    fut.get();
    std::cout << "future done" << std::endl;
} catch (std::exception const& e) {
    std::cout << "future error: " << e.what() << std::endl;
}

Now, to avoid interfering with the service because the future will block, I'd suggest running the service in the background instead:

asio::thread_pool ctx{1};

Of course, you can invert the situation by introducing a thread for the blocking wait:

std::thread ft{[ex] {
    auto fut = async_task(ex, asio::use_future, slow_print, "future");
    try {
        fut.get();
        std::cout << "future done" << std::endl;
    } catch (std::exception const& e) {
        std::cout << "future error: " << e.what() << std::endl;
    }
}};

ctx.run();
ft.join();

Double Moves

In your task implementation, you both move self and copy *this. However compose_op aggregates your async_task_impl (as the impl_ member), so there is a timing link between those. As far as I know the evaluation order or lambda captures in unspecified.

I'd suggest avoiding the unnecessary copy:

std::thread([self = std::move(self)]() mutable {
    auto& me = self.impl_;
    try {
        std::apply(me.fn_, me.args_);
        self.complete({});
    } catch (std::exception& e) {
        self.complete(std::current_exception());
    }
}).detach();

Or indeed, going for syntactic sugar:

std::thread([self = std::move(self)]() mutable {
    auto& [fn, args] = self.impl_;
    try {
        std::apply(fn, args);
        self.complete({});
    } catch (std::exception& e) {
        self.complete(std::current_exception());
    }
}).detach();

To make it even more elegant, just pass the self as a mutable argument instead of capturing it (this may not work with concurrency::create_task of course):

std::thread([](auto self) {
        auto& [fn, args] = self.impl_;
        try {
            std::apply(fn, args);
            self.complete({});
        } catch (std::exception& e) {
            self.complete(std::current_exception());
        }
    }, std::move(self)).detach();

Perfect Storage vs. Perfect Forwarding

Another place where you are not 100% clear about the forwarding intent is in the async_task_impl constructor. Args... is already in non-deduced context there, so Args&&... mandates rvalues. This might be why you used ""s-literals?

There are several ways to fix

  1. Either you can let the compiler do its job:

    async_task_impl(Fn&& fn, Args... args)
       : fn_(std::forward<Fn>(fn))
       , args_(std::move(args)...) {}
    
  2. If you feel that's a pessimization (does your code-base use expensive non-move-aware argument types?), the simplest is to make the construct an independent template:

    template <typename Fn2, typename... Args2>
    async_task_impl(Fn2&& fn, Args2&&... args)
        : fn_(std::forward<Fn2>(fn))
        , args_(std::forward<Args2>(args)...) {}
    
  3. I would probably go all-the-way and be explicit about the decay moment using a deduction guide. The best part is you no longer require a constructor at all:

    template <typename Fn, typename... Args> struct async_task_impl {
        Fn                  fn_;
        std::tuple<Args...> args_;
    
        auto operator()(auto& self) const {
            // @todo: use concurrency::create_task
            std::thread(
                [](auto self) {
                    auto& [fn, args] = self.impl_;
                    try {
                        std::apply(fn, args);
                        self.complete({});
                    } catch (std::exception& e) {
                        self.complete(std::current_exception());
                    }
                }, std::move(self)).detach();
        }
    };
    
    template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
    
    // runs some blocking task on the windows thread pool and wraps it in a nice
    // asio wrapper
    template <typename Executor, typename Token, typename Fn, typename... Args>
    auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
        return asio::async_compose<Token, void(std::exception_ptr)>(
            async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
    }
    

Full Demo

Combining all the above:

Live On Coliru Live On Compiler Explorer

#include "boost/asio.hpp"
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;

namespace {
    using std::this_thread::sleep_for;
    static auto now   = std::chrono::steady_clock::now;
    static auto start = now();
    static std::mutex trace_mx;

    static void trace(auto const&... args) {
        std::unique_lock lk(trace_mx);
        ((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
    }
} // namespace

template <typename Fn, typename... Args> struct async_task_impl {
    Fn                  fn_;
    std::tuple<Args...> args_;

    auto operator()(auto& self) const {
        // @todo: use concurrency::create_task
        std::thread(
            [](auto self) {
                auto& [fn, args] = self.impl_;
                try {
                    std::apply(fn, args);
                    self.complete({});
                } catch (std::exception& e) {
                    self.complete(std::current_exception());
                }
            },
            std::move(self))
            .detach();
    }
};

template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;

// wrap blocking task in an asio wrapper
namespace asio = boost::asio;
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor executor, Token&& token, Fn&& func, Args&&... args) {
    return asio::async_compose<Token, void(std::exception_ptr)>(
        async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
}

void slow_print(std::string str) {
    sleep_for(500ms);
    trace("slow_print: ", str);
    sleep_for(500ms);
}

asio::awaitable<void> my_coro() {
    auto ex = co_await asio::this_coro::executor;
    co_await async_task(ex, asio::use_awaitable, slow_print, "coro");
    trace("coro done");
}

void run_tests(auto ex) {
    async_task(
        ex, [](std::exception_ptr) { trace("callback done"); }, slow_print, "callback");

    asio::co_spawn(ex, my_coro(), asio::detached);

    std::thread ft{[ex] {
        auto fut = async_task(ex, asio::use_future, slow_print, "future");
        fut.get();
        trace("future done");
    }};

    ft.join();
}

int main() try {
    {
        trace("Starting ctx1");
        asio::io_context ctx1;

        run_tests(ctx1.get_executor());

        trace("Waiting ctx1");
        ctx1.run();
        trace("Done ctx1");
    }

    trace("----\n");
    {
        trace("Starting ctx2");
        asio::thread_pool ctx2{1};

        run_tests(ctx2.get_executor());

        trace("Waiting ctx2");
        ctx2.join();
        trace("Done ctx2");
    }

    sleep_for(2s);
    trace("Bye");
} catch (std::exception const& e) {
    trace(e.what());
}

Prints

at    0ms Starting ctx1
at  500ms slow_print: callback
at  500ms slow_print: future
at 1000ms callback done
at 1000ms future done
at 1000ms Waiting ctx1
at 1501ms slow_print: coro
at 2001ms coro done
at 2001ms Done ctx1
at 2001ms ----

at 2001ms Starting ctx2
at 2501ms slow_print: callback
at 2501ms slow_print: future
at 2502ms slow_print: coro
at 3001ms callback done
at 3002ms future done
at 3002ms coro done
at 3002ms Waiting ctx2
at 3002ms Done ctx2
at 5002ms Bye
sehe
  • 374,641
  • 47
  • 450
  • 633
  • God bless man, this is the most useful answer I could have hoped for and helps me understand the machinery much better. Let me respond to a few points: `(1)` I was just using `std::async` as a quick shorthand for the `thread + join() after run()` combo `(2)` It's encouraging to see you access `self.impl_`; I attempted that once but it seemed so wrong to pilfer the internals of a `::details` object like that I figured there was a better way `(3)` Your points about forwarding are much appreciated, I was slightly out of my depth trying to vaguely keep track of r/lvalues. Thanks again! – MHebes Jan 25 '23 at 17:05
  • 1
    Got this working with `concurrency::task` by changing the `std::thread([](auto self) { ... }).detach();` to `concurrency::create_task([self = std::make_shared>(std::move(self))]() mutable { ... };` Ugly but is also basically what `std::async` does – MHebes Jan 25 '23 at 19:28
  • 1
    I played a little further to also get correct handler invocation yesterday: http://coliru.stacked-crooked.com/a/0b58622e26054278 It traces thread "ids" while distinguishing registered service threads/"wild" threads. I'm also not keen on using the implementation details, but TBH I cannot figure out how to get correct handler invocation otherwise. `compose_op.complete(...)` doesn't seem to do that. Maybe it is due to the nature of `async_compose` instead of `async_initiate` [see this excellent comment](https://github.com/boostorg/beast/issues/1772#issuecomment-558688122) – sehe Jan 25 '23 at 19:48
  • That version seems to not run the coroutine; is this because of the same issue with coroutines not actually being work? – MHebes Jan 25 '23 at 21:15
  • Sorry, that may have been an artifact of coliru. Running it locally seems to run as expected! – MHebes Jan 25 '23 at 21:24
  • @MHebes interesting. It was a one-off http://coliru.stacked-crooked.com/a/94ccf10ba3ba4242 but in fact it might be just the first time I was able to witness the effect you obeserved. I can't see an explanation from first principles. The only "danger area" I saw is where the workguard(s) are released before invocation of the handler. But then we post that (`dispatch` from the detached thread is by definition going to `post`) which is work in its own right ¯\\_(ツ)_/¯ – sehe Jan 25 '23 at 22:12
  • I'm stress testing now on my box until I get a similar run. Will also instrument the code in various ways, possibly emulating lower resource conditions that exist on Coliru – sehe Jan 25 '23 at 22:13
  • I really cannot reproduce it - locally nor on Coliru - see http://coliru.stacked-crooked.com/a/b0178fd3b8ab09e9. I even went back to 1.79.0 to check/compare. Different optimization levels. ¯\\_(ツ)_/¯ (locally I ran 48 concurrent process instances of the [original code](https://stackoverflow.com/questions/75218245/async-compose-not-keeping-io-context-full-of-work/75228866?noredirect=1#comment132767807_75228866) to create load as well) – sehe Jan 25 '23 at 23:41
  • Having some interesting behavior on MSVC---Tried to make a "wrap in a coroutine" function to do the `co_await this_coro::executor` + `use_awaitable` parts for you and it is giving me garbage output like `╕∩┐ïⁿ` or `╡(▼è÷` (or sometimes blank) on MSVC. Works fine on [godbolt](https://godbolt.org/z/Pn8T3Tnsn) clang though so maybe an issue specific to MSVC. – MHebes Jan 26 '23 at 17:59
  • 1
    I can't [get it](https://godbolt.org/z/hjjahd37P) to [compile at all](https://rextester.com/FBJK73635). If you can minimize it (e.g. http://coliru.stacked-crooked.com/a/7f8c0235363ccf89) and still repro, it's worth a new question IMO – sehe Jan 27 '23 at 00:31
  • For what it's worth, I have made a new question where I think I've narrowed down the issue. https://stackoverflow.com/q/76623249/3554391. Not 100% confident that that's what's happening here -- but if I make `wrap_coro` take its params by value and move them into `async_task` it works fine. – MHebes Jul 05 '23 at 19:47
  • 1
    @MHebes thanks for the hint. I'll store this as a cautionary note near my habit of always passing-by-value if I accept temporaries. I forgot there is an intersection with coro's not being conducive to references (unless they happen to be life-time guaranteed). In most other cases "perfect forwarding" seems like a slightly more exact, though tedious way to write, so I didn't give it all the attention it deserved. (I did notice the need to be exact about decay for storage in my answer code, though, if I remember correctly) – sehe Jul 05 '23 at 21:57