1

In boost-asio, I realized that there was no easy way to have something that resembles a condition variable.

However, I realized that I could get something very similar using stackful coroutines (https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spawn.html), while using an io_context as a 'queue' for completion tokens. Below is my approach:

#include "asio_types.hpp"
#include <atomic>
#include <iostream>

class async_pending_queue {
public:
  async_pending_queue()
      : pending_handlers_(0), strand_(pending_queue_), wg_(asio::make_work_guard(pending_queue_)) {}

  template <typename CompletionToken>
  auto async_submit(
      CompletionToken &&token,
      std::function<void(void)> atomic_action = [] {}) {
    auto init = [this, &atomic_action](auto completion_handler) {
      auto posted_lambda = [handler = std::move(completion_handler),
                            this]() mutable {
        pending_handlers_--;
        asio_sys_err ec;
        handler(ec);
        };

      post(strand_,std::move(posted_lambda));

      pending_handlers_++;
      atomic_action();
    };

    return asio::async_initiate<CompletionToken, void(asio_sys_err)>(init,
                                                                     token);
  }

  int pending_count() { return pending_handlers_.load(); }

  // It may not run 1 and run 0
  bool try_run_one() {
    auto cnt = pending_queue_.poll_one();
    std::cout << "completion token result" << cnt << std::endl;
    bool ret = (cnt == 1);
    return ret;
  }

private:
  std::atomic<unsigned int> pending_handlers_;
  asio_ioctx pending_queue_;
  asio_ioctx::strand strand_;
  decltype(asio::make_work_guard(pending_queue_)) wg_;
};

Here, one simply uses calls my_async_pending_queue.async_submit(yield), if calling from a stackful coroutine. The coroutine can be continued by calling my_async_pending_queue.try_run_one().

Using this, I wanted to build a 'memory checker'. It has two functions -> request_space and free_space. A coroutine calls request_space, which may block if there is no space left. Meanwhile, another thread / coroutine can call free_space, which will run blocked coroutines if possible.

I built a toy memory checker wrapper as follows:

#ifndef MEM_CHECK_HPP
#define MEM_CHECK_HPP

#include <cstddef>
#include <mutex>
#include <queue>

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

namespace asio = boost::asio;
using asio_ioctx = asio::io_context;
using asio_sys_err = boost::system::error_code; 
using asio::yield_context;

#include "async_pending_queue.hpp"

class MemoryChecker {
public:
  using bytes_cnt = size_t;

  MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem = 1024, bytes_cnt initial_fill = 0);

  // This requests some space, and possibly yields;
  void request_space(bytes_cnt cnt, yield_context yield);
  void free_space(bytes_cnt cnt);

private:
  bytes_cnt get_available_mem();

  const bytes_cnt total_mem_;
  bytes_cnt mem_used_;
  std::queue<bytes_cnt> request_queue_;
  async_pending_queue request_routines_;
  std::mutex lock_;
  asio_ioctx::strand fifo_strand_;
  asio_ioctx &completion_ioctx_;
};



inline MemoryChecker::MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem,
                             bytes_cnt initial_fill)
    : total_mem_(total_mem), mem_used_(initial_fill), request_queue_{}, lock_{},
      request_routines_{}, completion_ioctx_(ioctx), fifo_strand_(ioctx) {}

inline MemoryChecker::bytes_cnt MemoryChecker::get_available_mem() {
  assert(total_mem_ >= mem_used_);
  return total_mem_ - mem_used_;
}

inline void MemoryChecker::request_space(bytes_cnt cnt, yield_context yield) {
  // if (cnt > total_mem_) throw logic_error
  lock_.lock();
  assert(cnt <= total_mem_);
  assert(cnt > 0);

  if (request_queue_.empty()) {
    assert(request_routines_.pending_count() == 0);
    if (get_available_mem() >= cnt) {
      // We bypass the pending queue
      mem_used_ += cnt;
      lock_.unlock();
      return;
    }
  }

  assert(request_queue_.size() == request_routines_.pending_count());

  std::cout << "Pushing " << cnt << std::endl;
  request_queue_.push(cnt);

  auto wg = asio::make_work_guard(completion_ioctx_);
  request_routines_.async_submit(yield, [this] { lock_.unlock(); });


  auto oldest_req{request_queue_.front()};
  assert(cnt == oldest_req);
  request_queue_.pop();
 
  mem_used_ += cnt;
  assert(request_queue_.size() == request_routines_.pending_count());
  asio::post(fifo_strand_, yield); 
}

inline void MemoryChecker::free_space(bytes_cnt cnt) {
  {
    std::lock_guard<std::mutex> lg{lock_};
    mem_used_ -= cnt;
    // Here, we own the lock, and free as many coroutines as we can
    while (true) {
      if (request_queue_.size() == 0) {
        std::cout << "No pending requests. Bailing" << std::endl;
        break;
      }

      assert(request_queue_.size() == request_routines_.pending_count());

      auto oldest_req{request_queue_.front()};
      auto available_mem{get_available_mem()};

      if (available_mem < oldest_req) {
        std::cout << "Oldest request is larger than available_mem. Bailing" << std::endl;
        break;
      }
      assert(request_routines_.try_run_one() == true);
    }
  }
}

#endif /* MEM_CHECK_HPP */

Here is a test program that can run it:

#include "mem_check.hpp"
#include <thread>
#include <unistd.h>

constexpr size_t mc_size{4};
asio_ioctx ioctx;
size_t total{0};
MemoryChecker mc{ioctx, mc_size};

void requestor_coroutine(size_t rq,yield_context yield) {
  asio::steady_timer t(ioctx);
  while (true) {
    total += rq;
    mc.request_space(rq, yield);
    std::cout << "Got requested space ";

    asio_sys_err ec;
  }
}

int main() {
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(1,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(2,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(3,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(4,yield); });

  std::thread t([] { ioctx.run(); });

  while (true) {
    getchar();
    std::cout << total << std::endl;
    if (total > 0) {
      std::cout << "freeing" << std::endl;
      mc.free_space(1);
      total -= 1;
    }
  }
  t.join();
}

Finally, the problem we face is as follows. When we run the program, the assertion total_mem >= mem_used_ fails. On some further investigation, I realized that our completion token was being called even when we do not call try_run_one, which was very weird.

Finally, somewhat more surprisingly, If I replace post(stand_,std::move(posted_lambda)); by post(pending_queue_,std::move(posted_lambda));, things seem to work. However, the asio documentation says that only strands guarantee a FIFO execution order. I am not sure if using a simple io_context will work as a FIFO queue (even though it seems to in these examples).

Any inputs would be helpful - I am happy to hear the problem in this implementation, as well as other implementations (for example, using a std::queue as a proper queue instead of this io_context hack).

Suraaj K S
  • 600
  • 3
  • 21

1 Answers1

0

Okay, wrapping my head around the code always starts with an involuntary code review. The following remarks may or may not end up being significant, but let's just keep notes so we don't loose track of them:

  • you're mixing legacy strands (io_context::strand instead of strand<executor>) with modern executor_work_guards (instead of legacy io_context::work). Why?

  • initialization order does not match the order of initializers (declaration order overrides it!). This tells me you don't enable compiler warnings (enough)

  • pending_count() is a racy accessor, which also accidentally converts unsigned to int

  • Atomics donot require explicit load() unless you're overriding default ordering

  • You're using a single thread to run a single io_context. That means a strand is redundant (because the single service thread is an implicit strand)

  • you capture atomic_action by reference, which is very risky because it is a function local (parameter). You might just be fine here because init is invoked before async_submit returns... BUT

  • async_submit "submits" an atomic_action "async" - but it doesn't? Since you run atomic_action unconditionally and blocking inside the init lambda, by definition it will not return. That seems the antithesis of asynchronous?

  • Oof, only now I spy

     request_routines_.async_submit(yield, [this] { lock_.unlock(); });
    

    That's mixing manual thread synchronization with async completion. Async expressly allows concurrency without threads, and strands obviate the need for explicit locking. Mixing them is bound to be inefficient (at best) or lead to deadlocks.

  • A slight code smell is using std::function because Asio completion handlers typically can bind to associated executor/allocator/cancellation slots; std::function erases these bindings (see boost::asio::bind_executor does not execute in strand). It might be fine if atomic_action is not an Asio completion in that sense, I'll find out later

  • Don't run side-effectful actions in assert:

     assert(request_routines_.try_run_one() == true);
    

    That's because assert will disappear in release builds.

     auto ok = request_routines_.try_run_one();
     assert(ok);
    
  • free_space runs an infinite loop under the lock (deadlock potential)

  • free_space runs try_run_one i.e. io_context::poll_one() under the lock (deadlock potential)

  • Why do you call the return value from poll_one() "completion token result"? That seems confused.

  • In request_space you randomly create a work guard. Why?

     auto wg = asio::make_work_guard(completion_ioctx_);
    
  • fifo_strand_ seems logically unused

Zooming out

I'm running out of steam. At this point it's not clear to me what the intended purpose of the different classes is. I do note that async_submit is a misnomer (as indicated above), as its side-effect are very much synchronous.

Is FIFO important? It seems counter-productive if e.g. a large request blocks the queue, while smaller ones are waiting as well. Of course, large requesters might become starved, but they can be anyways unless free_space are guaranteed to happen timely.

If we observe that lock_ should not exist as it duplicated the purpose of the strand, it seems that async_submit really is just async_wait, where it will "complete as soon as possible" but in FIFO order.

Also, it might be important for your intended semantics to make sure that async_submit actually happens on the strand_ (which currently is not the case).

Let's also realize that stackful coroutines implicitly are a strand. Code is sequenced. I'm not sure it is even possible to change executors within a stackful coroutine (I assume there is an actual strand internal to the yield context, and it will take precedence).

Just responding to a few loose remarks from the question:

I realized that our completion token was being called even when we do not call try_run_one, which was very weird.

Did you forget about

std::thread t([] { ioctx.run(); });

inside main?

using a std::queue as a proper queue instead of this io_context hack).

You already do. As well. You have strands (multiple). You have locks (manually...). You have the queue. And you have post.

I am not sure if using a simple io_context will work as a FIFO queue (even though it seems to in these examples)

IFF you have a single service thread that is the case (as mentioned above). In your async_pending_queue you have ... the threads that may call try_run_one(). It's intransparent to me.

SIMPLIFY: Timer ::cancel_one()

Note the docs:

This function forces the completion of one pending asynchronous wait operation against the timer. Handlers are cancelled in FIFO order. The handler for the cancelled operation will be invoked with the asio::error::operation_aborted error code.

It seems this is enough for your purpose. Now, mind you there's still enough room for subtle errors surrounding strands, but here goes:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <deque>
#include <iostream>

namespace asio   = boost::asio;
using error_code = boost::system::error_code;
using asio::yield_context;

class MemoryChecker {
  public:
    using Size = size_t;

    inline MemoryChecker(asio::any_io_executor ctx, Size total_mem, Size initial_fill = 0)
        : total_(total_mem)
        , in_use_(initial_fill)
        , strand_(ctx) {}

    inline void async_request_space(Size rq, yield_context yield) {
        if (!rq)         return;
        if (rq > total_) throw std::logic_error("async_request_space");

        post(strand_, yield);

        if (queue_.empty() && get_available_mem() >= rq) {
            in_use_ += rq;
            post(yield); // disengage from strand_
        } else {
            queue_.push_back(rq);
            async_wait(yield); // disengages from strand_
        }
    }

    inline void free_space(Size n, yield_context yield) {
        post(strand_, yield);

        in_use_ -= n;

        while (!queue_.empty() && get_available_mem() >= queue_.front()) {
            auto rq = queue_.front();
            queue_.pop_front();

            event_.cancel_one();

            in_use_ += rq;
        }
    }

  private:
    Size get_available_mem() const {
        assert(total_ >= in_use_);
        return total_ - in_use_;
    }

    void async_wait(yield_context yield) {
        error_code ignored;
        event_.async_wait(yield[ignored]); // disengages _strand because yield has its own associated executor
    }

    const Size total_;
    Size       in_use_;

    std::deque<Size> queue_;

    asio::strand<asio::any_io_executor> strand_;
    asio::steady_timer event_{strand_, asio::steady_timer::time_point::max()};
};

std::mutex console_mx;
void trace(auto const&... args) {
    std::lock_guard lk(console_mx);
    (std::cout << ... << args) << std::endl;
}

int main() {
    asio::thread_pool ctx; // require >1 because of blocking IO in "freeer" (producer) task
    size_t            total_requested{0};
    MemoryChecker     mc{ctx.get_executor(), 4};

    auto consumer = [&](size_t rq, yield_context yield) {
        while (true) {
            trace("Requesting ", rq);
            total_requested += rq;
            mc.async_request_space(rq, yield);
            trace("Got requested space ", rq);
        }
    };

    auto producer = [&](yield_context yield) {
        while(std::cin) {
            trace("Press ENTER..."), std::cin.ignore(1024, '\n');
            trace("Total: ", total_requested);
            if (total_requested > 0) {
                trace("freeing");
                mc.free_space(1, yield);
                total_requested -= 1;
            }
        }
        trace("producer gone");
    };

    using std::placeholders::_1;
    spawn(ctx, bind(consumer, 1, _1));
    spawn(ctx, bind(consumer, 2, _1));
    spawn(ctx, bind(consumer, 3, _1));
    spawn(ctx, bind(consumer, 4, _1));
    spawn(ctx, producer);

    ctx.join();
}

Live locally: enter image description here

Here's a version with extensive debug output and assertions to check our understanding of the executors:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <deque>
#include <fmt/ranges.h>
#include <iostream>

namespace asio   = boost::asio;
using error_code = boost::system::error_code;
using asio::yield_context;

std::mutex console_mx;
void trace(auto const& fmt, auto const&... args) {
    std::lock_guard lk(console_mx);
    fmt::print(fmt::runtime(fmt), args...);
    fmt::print("\n");
    std::fflush(stdout);
}

class MemoryChecker {
  public:
    using Size = size_t;

    inline MemoryChecker(asio::any_io_executor ctx, Size total_mem, Size initial_fill = 0)
        : total_(total_mem)
        , in_use_(initial_fill)
        , strand_(ctx) {}

    inline void dump(std::string_view fmt, auto const&... args) const {
        assert(strand_.running_in_this_thread());
        trace("MemoryChecker total_:{}, in_use_:{}, queue_: {} msg: {}", total_, in_use_, queue_,
              fmt::format(fmt::runtime(fmt), args...));
    }

    inline void async_request_space(Size rq, yield_context yield) {
        if (!rq)
            return;

        post(strand_, yield);
        assert(strand_.running_in_this_thread());

        dump("async_request_space {}", rq);

        if (rq > total_)
            throw std::logic_error("async_request_space");

        if (queue_.empty() && get_available_mem() >= rq) {
            in_use_ += rq;
            dump("immediately satisfied rq={}", rq);
            assert(strand_.running_in_this_thread());  // !!
            post(yield);                               // disengage from strand_
            assert(!strand_.running_in_this_thread()); // !!
        } else {
            assert(strand_.running_in_this_thread());  // !!
            queue_.push_back(rq);
            dump("queued, waiting rq={}", rq);

            async_wait(yield); // disengages from strand_
            assert(!strand_.running_in_this_thread());  // !!

            trace("completed rq={}", rq);
        }

        assert(!strand_.running_in_this_thread()); // !!
    }

    inline void free_space(Size n, yield_context yield) {
        post(strand_, yield);
        assert(strand_.running_in_this_thread());

        dump("freeing {}", n);
        in_use_ -= n;
        dump("freed {}", n);

        while (!queue_.empty() && get_available_mem() >= queue_.front()) {
            auto rq = queue_.front();
            queue_.pop_front();

            dump("signalling request {}", rq);
            event_.cancel_one();

            in_use_ += rq;
            dump("signalled {}", rq);
        }
        dump("exit free ({})", n);
    }

  private:
    Size get_available_mem() const {
        assert(strand_.running_in_this_thread());
        assert(total_ >= in_use_);
        return total_ - in_use_;
    }

    void async_wait(yield_context yield) {
        error_code ignored;
        event_.async_wait(yield[ignored]); // disengages _strand because yield has its own associated executor
        assert(ignored == asio::error::operation_aborted);
    }

    const Size total_;
    Size       in_use_;

    std::deque<Size> queue_;

    asio::strand<asio::any_io_executor> strand_;
    asio::steady_timer event_{strand_, asio::steady_timer::time_point::max()};
};

int main() {
    asio::thread_pool ctx; // require >1 because of blocking IO in "freeer" (producer) task
    size_t            total_requested{0};
    MemoryChecker     mc{ctx.get_executor(), 4};

    auto consumer = [&](size_t rq, yield_context yield) {
        while (true) {
            trace("Requesting {}", rq);
            total_requested += rq;
            mc.async_request_space(rq, yield);
            trace("Got requested space {}", rq);
        }
    };

    auto producer = [&](yield_context yield) {
        while(std::cin) {
            trace("Press ENTER..."), std::cin.ignore(1024, '\n');
            trace("Total: {}", total_requested);
            if (total_requested > 0) {
                trace("freeing");
                mc.free_space(1, yield);
                total_requested -= 1;
            }
        }
        trace("producer gone");
    };

    using std::placeholders::_1;
    spawn(ctx, bind(consumer, 1, _1));
    spawn(ctx, bind(consumer, 2, _1));
    spawn(ctx, bind(consumer, 3, _1));
    spawn(ctx, bind(consumer, 4, _1));
    spawn(ctx, producer);

    ctx.join();
}

Live locally: enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • I learned a lot. I tried to really grok your code. I'm sorry that failed. However, I think the break through came when I re-imagined using timer `cancel_one()` so it was probably worth it? – sehe May 23 '23 at 17:40
  • 1
    Thanks for your response (and your code review!). I do seem to have found the 'bug', which is basically this: https://stackoverflow.com/questions/71019704/boostasiostrand-merges-multiple-handlers-into-one – Suraaj K S May 23 '23 at 21:12
  • Moreover, in the future, it may be possible that multiple threads call `ioctx.run()`, which is why I do not assume the 'implicit strand' argument. – Suraaj K S May 23 '23 at 21:56
  • 1
    Yeah I got that. Note my alternatives all use multiple threads just to make sure I tested with the constraints. – sehe May 23 '23 at 21:57
  • I have a few comments / questions: 1. You `post(yield)` to 'release' the strand. This means that you are using the default `system_executor`. Is there any reason why you don't post to the `thread_pool`? 2. Why do you say that you need >1 in your `thread_pool`. 3. Finally, the reason I'm mixing strands and mutexes are because you cannot 'hold' many strands at a time. When you post to a strand, you are released from the current strand, which is a limitation I would like to avoid. – Suraaj K S May 23 '23 at 22:21
  • 1
    Here are answers to some of your questions: 1. "In request_space you randomly create a work guard. Why?" -- When I was testing the application, the ioctx 'stopped' after the `async_submit`. 2. "`async_submit` "submits" an atomic_action async" - but it doesn't?" -- `async_submit` actually does not submit that, but a completion token (the first argument). In hindsight, I should not have named the function `atomic_action`, and something like `extra_action`. I needed this so that my mutex could be released **after** posting to the queue. Thanks again for your time. – Suraaj K S May 23 '23 at 22:21
  • 1. "This means that you are using the default system_executor" - nope, I could be wrong, but my comment states my assumption: `yield_context` has an associated executor (see [here](https://stackoverflow.com/questions/52458609/which-io-context-does-stdboostasiopost-dispatch-use/52460852#52460852) and e.g. observer `yield.get_executor()`). – sehe May 23 '23 at 22:26
  • 2. The full comment reads _"// require >1 because of blocking IO in "freeer" (producer) task"_. I don't really know what to add. `std::cin` is blocking IO. If there weren't a second service thread, it would block all handlers. – sehe May 23 '23 at 22:27
  • 3. Mmm. I think it's a common thing to avoid overlapping locks (it leads to classical deadlocks). But I get your point. I don't usually miss that "feature" myself. – sehe May 23 '23 at 22:29
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/253791/discussion-between-sehe-and-suraaj-k-s). – sehe May 23 '23 at 22:30
  • Coming from https://stackoverflow.com/questions/76794580/using-a-boost-asio-strand-as-a-mutex-does-not-work-with-coroutines/ here: `post(strand_, yield)`, should really be `post(bind_executor(strand_,yield))`, I think? – Suraaj K S Jul 31 '23 at 02:25
  • @SuraajKS Mmm. Yes! Good call. Weirdly I missed that earlier, and the asserts seem to completely pin-down the behaviour as expected. I just retested and it still seemed fine with both spellings. HOWEVER when I reduce the code and it becomes much more tightly timed, it does show that the asserts fail without `bind_executor`: http://coliru.stacked-crooked.com/a/705417c78517fa80 – sehe Jul 31 '23 at 03:27
  • The most surprising bit is, though, that even with `bind_executor` applied, the behaviour is only as expected since Boost 1.80.0 (specifically since [5bbdc9b Added new spawn() overloads that conform to the requirements for asynchronous operations](https://github.com/boostorg/asio/commit/5bbdc9b7093b7c862a3a94e40b3f4e379f8ca1e8) . These overloads also support cancellation. When targeting C++11 and later these functions are implemented in terms of Boost.Context directly. The existing overloads have been retained but are deprecated. – sehe Jul 31 '23 at 03:31
  • I think I'm going to stand by my advisory in the answer: _"Let's also realize that stackful coroutines implicitly are a strand. Code is sequenced. I'm not sure it is even possible to change executors within a stackful coroutine (I assume there is an actual strand internal to the yield context, and it will take precedence)."_. As long as you diligently keep the asserts in your code (and provided you test diligently with debug builds) it may be fine if your code base really needs the "switch to strand during coro" behaviour. – sehe Jul 31 '23 at 03:36