1

I have a sequence of nested async operations on an asio::ip::tcp::socket and I want to have a possibility to stop it as fast as possible from an other thread. Here is some pseudo:

socket->async_something(buffer, CallbackLambdaHeader
{
    if (errorcode)
    {
        //exit the sequence
    }
    // some code
    socket->async_something(buffer, CallbackLambdaHeader
    {
        if (errorcode)
        {
            //exit the sequence
        }
        // some code
        socket->async_something(buffer, CallbackLambdaHeader
        {
            if (errorcode)
            {
                //exit the sequence
            }
            // etc
        });
    });
});

The problem is that a simple socket->cancel() call wont always work because in the moment of the call one of the callbacks may be running. So there will be no enqueued operations to cancel, but a running callback can add one soon and continue the sequence. I want the async operations after socket->cancel() to be cancelled too (instantly complete with operation_aborted error).

The only idea is to somehow wrap every call in a mutex.lock() and use a cancellation flag

//thread1:
mutex.lock();
cancel = true;
socket->cancel();
mutex.unlock();

//thread 2:
mutex.lock();
if (!cancel)
{
    socket->async_something(buffer, CallbackLambdaHeader
    {
        if (errorcode)
        {
            //exit the sequence
        }
        // some code
        mutex.lock();
        if (!cancel)
        {
            socket->async_something(buffer, CallbackLambdaHeader
            {
                if (errorcode)
                {
                    //exit the sequence
                }
                // some code
                mutex.lock();
                if (!cancel)
                {
                    socket->async_something(buffer, CallbackLambdaHeader
                    {
                        if (errorcode)
                        {
                            //exit the sequence
                        }
                        // etc
                    });
                }
                else
                {
                    //exit the sequence
                }
                mutex.unlock();
            });
        }
        else
        {
            //exit the sequence
        }
        mutex.unlock();
    });
}
else
{
    //exit the sequence
}
mutex.unlock();

But it looks awful.
Have heard about io_service::strand, but not sure how to use it here. Use .post() with the socket->cancel() method? Is there any guarantee that situation {callback, cancel, callback} is impossible?

Acuion
  • 25
  • 4

1 Answers1

0

{callback, cancel, callback} is always possible, this is not what you want to avoid. In fact, any cancel will always result in any pending operation completing, so there will be a callback at least for those (with an error code indicating the operation was aborted).

What you want/need to avoid is scheduling new tasks once you know the socket had been "canceled" (the cancel operation doesn't do that: it cancels operations, not sockets).

I see two things, depending on the situation one could be what you want:

  1. If you want to close a socket and all operations on it, permanently

    I'd simply post a task to close the socket (which will also cancel pending operations, of course). The benefit is that you can check the socket state [sock.is_open]http://www.boost.org/doc/libs/1_65_1/doc/html/boost_asio/reference/basic_stream_socket/is_open.html) before accidentally scheduling new operations for that socket.

    _service.post([this] { sock.close(); });
    

    You need a strand to avoid data races accessing the sock variable iff you have multiple threads run-ning the service. See Why do I need strand per connection when using boost::asio?

  2. If you want to cancel specific operation chains leaving the socket open¹, you could have a flag with each socket, that indicates when operations were canceled.

    All the completion handlers involved will already receive an error_code containing boost::asio::error::operation_aborted so it should be really easy to avoid doing "follow up" work in that chain.

    Just put something like this in the callback lambda header:

        if (ec == boost::asio::error::operation_aborted)
             return;
    

    In fact you might want to do that on most errors.

        if (ec)
             return; // log `ec.message()`?
    

    The flag is merely advantageous if some other thread was just about to post a new independent async operation (chain) and you want to block that too (again without closing the socket, otherwise use solution #1).

    No synchronization is needed on that socket, because you don't share it with anyone that didn't already share the sock. sock is also not thread-safe, so you know accessing the flag must be safe.

    Again, if that weren't true you needed a strand to begin with.

    >

    Having a single service thread means you have an implicit strand, and you can avoid the complexity even though you might have many threads posting tasks by never posting async operations directly:

       _service.post([=] {  if (!_sock_canceled_flag) _sock.async_something (....
                  ); });
    

    This way ensure all socket operations are on the implicit strand.


¹ I actually can't think of a protocol in which this could make sense, but perhaps for a datagram protocol (?)

sehe
  • 374,641
  • 47
  • 450
  • 633
  • *"I'd simply post a task to close the socket"* Yes, sounds like what I need. But is a situation {in-strand callback ends, new piece of data arrives to the socket, new callback enqueues to the strand, the closing thread enqueues close() method} now impossible? – Acuion Dec 04 '17 at 22:28
  • I'm confused. Callbacks never "spontaneously enqueue" when data arrives. Read operations _need_ to be posted on that strand, first. I'm unsure what else you could want to ask. – sehe Dec 04 '17 at 22:59