2

I have a boost::asio::ssl::stream<boost::asio::ip::tcp::socket> whose io_context is serviced by a thread, Tskt. I have a "write loop" that asynchronously writes to the socket, invoked from a different thread, Tsnd:

void handleBufferWritten(
    boost::system::error_code const&          ec,
    std::shared_ptr<std::vector<char>> const& pSrc, std::size_t curSrcPos,
    boost::asio::ssl::stream<boost::asio::ip::tcp::socket>& rSkt) {
    if (ec)
        return;
    if (!pSrc)
        return;
    static auto const MaxToWrite            = 200;
    auto const        howManyBytesRemaining = pSrc->size() - curSrcPos;
    if (howManyBytesRemaining == 0)
        return; // loop exit
    // else there are more bytes to be written:
    auto const howManyBytesToWrite =
        std::min(howManyBytesRemaining, MaxToWrite);
    boost::asio::async_write(rSkt,
        boost::asio::buffer(pSrc->data() + curSrcPos, howManyBytesToWrite),
        [pSrc, nextSrcPos = curSrcPos + howManyBytesToWrite,
         &rSkt](boost::system::error_code const& ec,
                std::size_t /*bytesTransferred*/) {
            handleBufferWritten(ec, pSrc, nextSrcPos, rSkt);
        });
}

void startBufferWrite(
    std::vector<char> const&                                src,
    boost::asio::ssl::stream<boost::asio::ip::tcp::socket>& rSkt) {
    auto pBuf = std::make_shared<std::vector<char>>(src);
    // start loop:
    handleBufferWritten(boost::system::error_code{}, pBuf, 0, rSkt);
}

I built with "-fsanitize=thread" and it warned me of a data race. As a result of my analysis, I believe that there is no resolution between 2 threads contending on a deadline_timer's expiry point (Tskt thread writing: https://github.com/boostorg/asio/blob/25dc6780c2c73dd6a4d74e65e854fc0f705cbb60/include/boost/asio/ssl/detail/io.hpp#L182, Tsnd thread reading: https://github.com/boostorg/asio/blob/25dc6780c2c73dd6a4d74e65e854fc0f705cbb60/include/boost/asio/ssl/detail/io.hpp#L179). (I'm using boost1.67, but the difference between this and the current version is minimal.)

Here's the ThreadSanitizer warning:

==================
WARNING: ThreadSanitizer: data race (pid=19106)
  Write of size 8 at 0x7b440000ff78 by thread T4:
    #0 boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::expires_at(boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::implementation_type&, boost::posix_time::ptime const&, boost::system::error_code&) /boost-1_67/boost/asio/detail/deadline_timer_service.hpp:194 (MyApp+0x000000a6ac6c)
    #1 boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime> >::expires_at(boost::posix_time::ptime const&) /boost-1_67/boost/asio/basic_deadline_timer.hpp:439 (MyApp+0x000000a684a1)
    #2 boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >::operator()(boost::system::error_code, unsigned long, int) /boost-1_67/boost/asio/ssl/detail/io.hpp:182 (MyApp+0x000000a6ea5b)
    #3 boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>::operator()() /boost-1_67/boost/asio/detail/bind_handler.hpp:164 (MyApp+0x000000a802aa)
    #4 void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long> >(boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>&, ...) /boost-1_67/boost/asio/handler_invoke_hook.hpp:69 (MyApp+0x000000a7f966)
    #5 void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>, std::function<void (boost::system::error_code const&, unsigned long)> >(boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>&, std::function<void (boost::system::error_code const&, unsigned long)>&) /boost-1_67/boost/asio/detail/handler_invoke_helpers.hpp:37 (MyApp+0x000000a7e8d0)
    #6 void boost::asio::ssl::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>, boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >(boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>&, boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >*) /boost-1_67/boost/asio/ssl/detail/io.hpp:316 (MyApp+0x000000a7c878)
    #7 void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>, boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> > >(boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>&, boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >&) /boost-1_67/boost/asio/detail/handler_invoke_helpers.hpp:37 (MyApp+0x000000a7b736)
    #8 void boost::asio::detail::handler_work<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::asio::system_executor>::complete<boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long> >(boost::asio::detail::binder2<boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >, boost::system::error_code, unsigned long>&, boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >&) /boost-1_67/boost/asio/detail/handler_work.hpp:82 (MyApp+0x000000a79dc5)
    #9 boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> > >::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) /boost-1_67/boost/asio/detail/reactive_socket_recv_op.hpp:122 (MyApp+0x000000a779b0)
    #10 boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) /boost-1_67/boost/asio/detail/scheduler_operation.hpp:40 (MyApp+0x00000096f0c6)
    #11 boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) /boost-1_67/boost/asio/detail/impl/scheduler.ipp:401 (MyApp+0x0000009707ff)
    #12 boost::asio::detail::scheduler::run(boost::system::error_code&) /boost-1_67/boost/asio/detail/impl/scheduler.ipp:154 (MyApp+0x0000009704b5)
    #13 boost::asio::io_context::run() /boost-1_67/boost/asio/impl/io_context.ipp:62 (MyApp+0x000000970aa0)
    #14 operator() /src/MyThread.cpp:24 (MyApp+0x0000009e3101)
    #15 __invoke_impl<void, MyThread::start()::<lambda()> > /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:60 (MyApp+0x0000009e35ab)
    #16 __invoke<MyThread::start()::<lambda()> > /opt/rh/devtoolset-7/root/usr/include/c++/7/bits/invoke.h:95 (MyApp+0x0000009e32da)
    #17 _M_invoke<0> /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:234 (MyApp+0x0000009e3b14)
    #18 operator() /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:243 (MyApp+0x0000009e3aa6)
    #19 _M_run /opt/rh/devtoolset-7/root/usr/include/c++/7/thread:186 (MyApp+0x0000009e3a4c)
    #20 execute_native_thread_routine <null> (MyApp+0x0000010e6f7e)

  Previous read of size 8 at 0x7b440000ff78 by thread T3 (mutexes: write M1465, write M1880, write M1853):
    #0 boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::expires_at(boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::implementation_type const&) const /boost-1_67/boost/asio/detail/deadline_timer_service.hpp:180 (MyApp+0x000000a6ad70)
    #1 boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime> >::expires_at() const /boost-1_67/boost/asio/basic_deadline_timer.hpp:411 (MyApp+0x000000a68534)
    #2 boost::asio::ssl::detail::stream_core::expiry(boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime> > const&) /boost-1_67/boost/asio/ssl/detail/stream_core.hpp:84 (MyApp+0x000000a66974)
    #3 boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::write_op<boost::asio::const_buffers_1>, boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> > >::operator()(boost::system::error_code, unsigned long, int) /boost-1_67/boost/asio/ssl/detail/io.hpp:179 (MyApp+0x000000a760ac)
    #4 void boost::asio::ssl::detail::async_io<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::write_op<boost::asio::const_buffers_1>, boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> > >(boost::asio::basic_stream_socket<boost::asio::ip::tcp>&, boost::asio::ssl::detail::stream_core&, boost::asio::ssl::detail::write_op<boost::asio::const_buffers_1> const&, boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> >&) /boost-1_67/boost/asio/ssl/detail/io.hpp:334 (MyApp+0x000000a73838)
    #5 boost::asio::async_result<std::decay<boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> > >::type, void (boost::system::error_code, unsigned long)>::return_type boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >::async_write_some<boost::asio::const_buffers_1, boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> > >(boost::asio::const_buffers_1 const&, boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> >&&) /boost-1_67/boost/asio/ssl/stream.hpp:653 (MyApp+0x000000a718cc)
    #6 boost::asio::detail::write_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> >::operator()(boost::system::error_code const&, unsigned long, int) /boost-1_67/boost/asio/impl/write.hpp:259 (MyApp+0x000000a6f084)
    #7 void boost::asio::detail::start_write_buffer_sequence_op<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)> >(boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >&, boost::asio::mutable_buffers_1 const&, boost::asio::mutable_buffer const* const&, boost::asio::detail::transfer_all_t, std::function<void (boost::system::error_code const&, unsigned long)>&) /boost-1_67/boost/asio/impl/write.hpp:345 (MyApp+0x000000a6bcc0)
    #8 boost::asio::async_result<std::decay<std::function<void (boost::system::error_code const&, unsigned long)> const&>::type, void (boost::system::error_code, unsigned long)>::return_type boost::asio::async_write<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, std::function<void (boost::system::error_code const&, unsigned long)> const&>(boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >&, boost::asio::mutable_buffers_1 const&, std::function<void (boost::system::error_code const&, unsigned long)> const&, std::enable_if<boost::asio::is_const_buffer_sequence<boost::asio::mutable_buffers_1>::value, void>::type*) /boost-1_67/boost/asio/impl/write.hpp:435 (MyApp+0x000000a692a0)
...
    #10 startBufferWrite() /src/startBufferWrite.cpp:163 (MyApp+0x000000a8384c)
...
    #48 boost::asio::detail::scheduler::do_wait_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, long, boost::system::error_code const&) /boost-1_67/boost/asio/detail/impl/scheduler.ipp:481 (MyApp+0x000000f33030)
...
SUMMARY: ThreadSanitizer: data race /boost-1_67/boost/asio/detail/deadline_timer_service.hpp:194 in boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::expires_at(boost::asio::detail::deadline_timer_service<boost::asio::time_traits<boost::posix_time::ptime> >::implementation_type&, boost::posix_time::ptime const&, boost::system::error_code&)
==================

For now I'm simply removing all my code that uses asio ssl. I'd like to be able to put it back in but right now I'm getting crashes and hangs when it's in that I'm not getting when it's out.

Fixes I've considered:

  1. Add a std::mutex member to the boost::asio::detail::deadline_timer_service<Time_Traits>::implementation_type class. But that would potentially penalize code that has nothing to do with sockets or ssl, to no particular benefit (that I can see).
  2. Replace the stream_core's deadline_timer objects with explicit continuation queues. (would require adding explicit invocation of the continuations, too)
  3. (already rejected, but I did consider:) post the initial write to the socket's io_context: this would be broken as soon as I (or another developer) decide to dedicate more than one thread to that context (the "posted" write could be running on one of the context's threads while the previously-received write is running on the other).

Any ideas?

Here's how Tsan is saying it's relevant:

A. in the "previous read" (thread T3) stack trace:

  1. at trace #5, ssl::stream<>::async_write_some() invokes ssl::stream::detail::async_io()
  2. at trace #4, ssl::stream::detail::async_io() invokes ssl::stream::detail::io_op<...>::operator()
  3. at trace #3, ssl::stream::detail::io_op<...>::operator() invokes ssl::stream::detail::stream_core::expiry(boost::asio::deadline_timer const&). The stream_core class contains 2 deadline_timer objects.
  4. at trace #2, ssl::stream::detail::stream_core::expiry() invokes asio::deadline_timer::expires_at(void)
  5. at trace #1, asio::deadline_timer::expires_at(void) invokes asio::detail::deadline_timer_service<asio::time_traits<boost::posix_time::ptime>>::expires_at()
  6. at trace #0, asio::detail::deadline_timer_service<asio::time_traits<boost::posix_time::ptime>>::expires_at() accesses asio::detail::deadline_timer_service<asio::time_traits<boost::posix_time::ptime>>::implementation_type::expiry, which is of type boost::posix_time::ptime.

B. Meanwhile, in thread T4:

  1. at trace #13, asio::io_context::run() begins to process a continuation. It takes reading all the way to trace #3 to see that it is a boost::asio::ssl::detail::io_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::ssl::detail::read_op<boost::asio::mutable_buffers_1>, std::function<void (boost::system::error_code const&, unsigned long)> >::operator(). What would have posted that io_op to the context? probably a previous invocation of io_op::operator(), upon discovering core_.pending_write_'s expiry to be unequal to core_.neg_infin().
  2. at trace #2, ssl::stream::detail::io_op<...>::operator() invokes asio::deadline_timer::expires_at(boost::posix_time::ptime)
  3. at trace #1, asio::deadline_timer::expires_at(boost::posix_time::ptime) invokes asio::detail::deadline_timer_service<asio::time_traits<ptime> >::expires_at(impl, ptime, error_code)
  4. at trace #0, asio::detail::deadline_timer_service<asio::time_traits<ptime> >::expires_at(impl, ptime, error_code) writes asio::detail::deadline_timer_service<asio::time_traits<boost::posix_time::ptime>>::implementation_type::expiry, which is of type boost::posix_time::ptime.

The deadline_timer instances are fully managed internally to asio. There is no way for me to ensure that each instance of io_op is posted on an appropriate strand--asio simply posts them to the deadline_timer. Therefore: there exists no method to ensure that my invocation of asio::async_write(skt, buf, hf) doesn't read/write a deadline_timer's expiry while one of the socket's io_context's threads is also reading/writing it.

Does this make clear the relevance between my invocation of asio::async_write() on an asio::ssl::stream<> and an interaction with an asio::deadline_timer?

B Simpson
  • 415
  • 2
  • 8
  • 1
    ssl stream [has no member `async_write`](https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/ssl__stream.html) and you disregard the bytesTransferred value. That's not gonna be fun to debug. – sehe Apr 22 '21 at 20:12
  • I fail to see the relevance of the code shown to the deadline timers in the threadsan output. There's no deadline timer in your code? – sehe Apr 22 '21 at 20:16
  • Just in case it helps, here's what I cleaned up your question code to in order to read it: http://coliru.stacked-crooked.com/a/e1955af4ed58481a (it fixes some things including the wrong buffer arithmetic). Oh, and ... you can scrap all this and use `boost::asio::[async_]write` composed operation, because it implements precisely such a loop: https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/async_write/overload1.html – sehe Apr 22 '21 at 20:41
  • Thanks for the comments! I made a few mistakes in paring down the original code. I probably could have written it as a solitary `boost::asio::async_write()` and still had the same effect since, as you observed, it does what I show behind the scenes. In my actual case, the code is pulling from an asynchronously-populated buffer that can get *very* large, and each time through the loop it actually checks to see if more data has been added. But that is not what is causing the problem, so I removed that complexity from the example. – B Simpson Apr 24 '21 at 02:15
  • Your edit is making me think that you didn't understand what I meant with "because it implements precisely such a loop".See: [_"This operation is implemented in terms of zero or more calls to the stream's async_write_some function, and is known as a composed operation"_](https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/async_write/overload1.html#boost-common-heading-doc-spacer:~:text=This%20operation%20is%20implemented%20in%20terms,is%20known%20as%20a%20composed%20operation). It makes no little/no sense to be doing any manual buffer splitting then. – sehe Apr 24 '21 at 02:34
  • Re: _"the code is pulling from an asynchronously-populated buffer that can get very large"_ this is giving me alarm bells. If the buffer is actually concurrently touched that's likely undefined behaviour. I think you should show more code, specifically the timers that your speak of. See https://stackoverflow.com/help/minimal-reproducible-example – sehe Apr 24 '21 at 02:36
  • My code isn't using timers. I just added a section that explains where the timers are in the asio source (hint: they aren't actually invoking based on wall-clock time; they are only ever assigned the expiry times of "negative infinity" and "positive infinity"). Hopefully it's worth your time to take a quick look? Thanks for your follow-ups. – B Simpson Apr 24 '21 at 04:19
  • Here's where asio::ssl::stream is invoking `async_wait()` on its deadline_timer data member: https://github.com/boostorg/asio/blob/25dc6780c2c73dd6a4d74e65e854fc0f705cbb60/include/boost/asio/ssl/detail/io.hpp#L165 – B Simpson Apr 24 '21 at 04:21

2 Answers2

2

I've already pointed out some problems with your question (code) in the comments.

Aside from that, I think there's a pretty simple answer to your question. In your analysis you deep-dive into library internals. Good for you, but also a sure sign that you're looking in the wrong places.

The goal isn't to reimplement Asio in a way that makes your usage of it thread-safe in retrospect. The goal is to find the usage that was thread-safe in the first place.

I'm still going to respond to one of your detail observation, because ... it has MERIT. Not bad, I guess?

  1. Add a std::mutex member to the boost::asio::detail::deadline_timer_service<Time_Traits>::implementation_type class. But that would potentially penalize code that has nothing to do with sockets or ssl, to no particular benefit (that I can see).

Yup. detail and implementation_type - two big "keep out" warnings. However, you're not far off the mark. Mutexes and asynchrony don't jibe well together, but in the end, executors can (and should) guard critical sections in some places.

Strands

And they do. In fact, Asio has an executor service that has an implementation type that does exactly that: hold a mutex.

Of course, that's also an implementation detail. You don't need to think about the mutex, all you care about is sequenced execution. This is what Asio calls strands of execution and this is indeed the service that implements strands.

So what you want to do is launch your operation on a strand. That way when an asynchronous event happens¹ you can (should) post the follow up on the same strand. The executor service then ensure that execution happens "on the strand" - meaning, in mutual exclusion with any other handler on the strand.

  • The main article explaining strands is here: Strands: Use Threads Without Explicit Locking
  • Tanner's answer keeps being amazing: Why do I need strand per connection when using boost::asio?
  • Note that the strand service has some smarts:
    • it may have more efficient synchronization primitives on some platforms
    • it may optimize away synchronization (e.g. when dispatch-ing from a handler on the current strand)
    • it might optimize all synchronization (e.g. when the io service is hinted with a non-concurrency hint)
    • in highly scalable solutions you would normally need to worry about excessive numbers of OS synchronization objects. In the Asio implementation, strands are like hash buckets and all strands sort into one of the buckets, so mutual exclusion guarantee is always met, but with an uppoer bound on the number of synchronization primitives.

Crystal Ball

So, I don't know what your code looks like, but in all likelihood you might have some

my_timer.async_wait([](error_code ec) {
    if (!ec) // asio::error::operation_aborted
        rSkt.cancel();
});

And you need to replace that with something like

my_timer.async_wait([](error_code ec) {
    if (!ec) // asio::error::operation_aborted
        my_strand.post([&rSkt] { rSkt.cancel(); });
});

May I suggest upgrading to a version of boost that has the newer executor model? It's a lot nicer to work with, because you can - indeed - tie your IO object to the strand directly, making it much easier to write the code correctly.

¹ (that isn't logically sequenced due to async call chain - a so-called logical strand)

sehe
  • 374,641
  • 47
  • 450
  • 633
0

In response to the later clarifications:

My code isn't using timers. I just added a section that explains where the timers are in the asio source (hint: they aren't actually invoking based on wall-clock time; they are only ever assigned the expiry times of "negative infinity" and "positive infinity"). Hopefully it's worth your time to take a quick look? Thanks for your follow-ups. – B Simpson 8 hours ago

Here's where asio::ssl::stream is invoking async_wait() on its deadline_timer data member: github.com/]oostorg/asio/blob/… – B Simpson 8 hours ago

Okay. Now it's becoming clearer. Indeed, you linked the only place where pending_write_ and pending_read_ are ever being asynchronously awaited.

The Design

In the context of io_op it looks safe: the entire thing is a strictly sequentially chained state machine. The async_wait can be expected to never be hit.

As an aside, here's a minimal, freely-threaded example for boost 1.65 through 1.75 that demonstrates this: Live On Compiler Explorer

The stream_core is an owned property of the ssl::stream. Separate io operations share it by reference.

So, if the engine encounters other async read/write operations in-flight, this should indicate you have another asynchronous IO operation on the same stream. That explains it reaches the async_wait.

Is That A Problem?

If the timers are being awaited, their handlers are invoked in the manner dictated by the original completion handler.

This is due to the extension point asio_handler_invoke being specialized for io_op<>:

template <typename Function, typename Stream,
    typename Operation, typename Handler>
inline void asio_handler_invoke(Function& function,
    io_op<Stream, Operation, Handler>* this_handler)
{
  boost_asio_handler_invoke_helpers::invoke(
      function, this_handler->handler_);
}

This give you the control to decide on locking/synchronization requirement. E.g., if your original handler was strand-wrapped, the timer callbacks will actually be on the same strand.

_In more recent Boost versions this mechanism is replaced by a (logically equivalent) mechanism called associated ewxecutores, see later on.

In other words, no it is not a problem: it's just something to be aware of.

Fixing, Also General Thread Safety

So basically what seems to happen is that you are causing overlapping async operations on the SSL stream. On top of that you're not strand-wrapping the completion handlers, which gives rise to the race condition inside the stream_core state.

Also, mind the thread-safety of the stream object itself (docs):

I went looking for similar verbiage as I linked earlier about composed operations, but all I found was this basic thread-safety information:

SSL and Threads

SSL stream objects perform no locking of their own. Therefore, it is essential that all asynchronous SSL operations are performed in an implicit or explicit strand. Note that this means that no synchronisation is required (and so no locking overhead is incurred) in single threaded programs.

This is separate from the need to strand-wrap completion handlers.

Executors

As mentioned Boost version 1.70.0 and up has the new Executor model. What this allows you to alternatively to manually remembering to strand-wrap your completion handler(s) is to associate the IO object (e.g. your stream with a strand executor alltogether.

That way all operations on the IO object will be associated with that executor. The streamcore's IO objects will also be associated with that same executor (that includes the timer objects). That way it becomes less error prone to remember the strand wrapping and also much easier to change e.g. when the need for threading changes.

sehe
  • 374,641
  • 47
  • 450
  • 633