1

There is a simple example with making use of boost::asio::io_context

https://github.com/unegare/boost-ex/blob/500e46f4d3b41e2abe48e2deccfab39d44ae94e0/main.cpp


    #include <boost/asio.hpp>
    #include <boost/beast/core.hpp>
    #include <boost/beast/http.hpp>
    #include <boost/beast/version.hpp>
    #include <thread>
    #include <vector>
    #include <memory>
    #include <mutex>
    #include <chrono>
    #include <iostream>
    #include <exception>

    std::mutex m_stdout;

    class MyWorker {
      std::shared_ptr<boost::asio::io_context> io_context;
      std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;
    public:
      MyWorker(std::shared_ptr<boost::asio::io_context> &_io_context, std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> &_work_guard):
        io_context(_io_context), work_guard(_work_guard) {}
      MyWorker(const MyWorker &mw): io_context(mw.io_context), work_guard(mw.work_guard) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] MyWorker copy constructor" << std::endl;
        m_stdout.unlock();
      }
      MyWorker(MyWorker &&mw): io_context(std::move(mw.io_context)), work_guard(std::move(mw.work_guard)) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] MyWorker move constructor" << std::endl;
        m_stdout.unlock();
      }
      ~MyWorker() {}

      void operator() () {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] Thread Start" << std::endl;
        m_stdout.unlock();

        while(true) {
          try {
            boost::system::error_code ec;
            io_context->run(ec);
            if (ec) {
              m_stdout.lock();
              std::cout << "[" << std::this_thread::get_id() << "] MyWorker: received an error: " << ec << std::endl;
              m_stdout.unlock();
              continue;
            }
            break;
          } catch (std::exception &ex) {
            m_stdout.lock();
            std::cout << "[" << std::this_thread::get_id() << "] MyWorker: caught an exception: " << ex.what() << std::endl;
            m_stdout.unlock();
          }
        }

        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] Thread Finish" << std::endl;
        m_stdout.unlock();
      }
    };

    class Client: public std::enable_shared_from_this<Client> {
      std::shared_ptr<boost::asio::io_context> io_context;
      std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;
      std::shared_ptr<boost::asio::ip::tcp::socket> sock;
      std::shared_ptr<std::array<char, 512>> buff;
    public:
      Client(std::shared_ptr<boost::asio::io_context> &_io_context, std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> &_work_guard, std::shared_ptr<boost::asio::ip::tcp::socket> &_sock):
        io_context(_io_context), work_guard(_work_guard), sock(_sock) {
        buff = std::make_shared<std::array<char,512>>();
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " with args" << std::endl;
        m_stdout.unlock();
      }
      Client(const Client &cl): io_context(cl.io_context), work_guard(cl.work_guard), sock(cl.sock), buff(cl.buff) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " copy" << std::endl;
        m_stdout.unlock();
      }
      Client(Client &&cl): io_context(std::move(cl.io_context)), work_guard(std::move(cl.work_guard)), sock(std::move(cl.sock)), buff(std::move(cl.buff)) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " move" << std::endl;
        m_stdout.unlock();
      }
      ~Client() {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " buff.use_count: " << buff.use_count() << " | sock.use_count: " << sock.use_count() << " | io_context.use_count: " << io_context.use_count() << std::endl;
        m_stdout.unlock();
      }

      void OnConnect(const boost::system::error_code &ec) {
        std::cout << __FUNCTION__ << std::endl;
        if (ec) {
          m_stdout.lock();
          std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
          m_stdout.unlock();
        } else {
    //      buff = std::make_shared<std::array<char, 512>>();
          char req[] = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
          memcpy(buff->data(), req, strlen(req));
          m_stdout.lock();
          std::cout << req << std::endl;
          m_stdout.unlock();
          sock->async_write_some(boost::asio::buffer(buff->data(), strlen(buff->data())), std::bind(std::mem_fn(&Client::OnSend), this, std::placeholders::_1, std::placeholders::_2));
        }
        std::cout << __FUNCTION__ << " use_count: " << buff.use_count() << std::endl;
      }

      void OnSend(const boost::system::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __FUNCTION__ << " use_count: " << io_context.use_count() << std::endl;
        if (ec) {
          m_stdout.lock();
          std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
          m_stdout.unlock();
        } else {
        std::cout << __FUNCTION__ << " use_count: " << buff.use_count() << std::endl;
          buff->fill(0);
        std::cout << __FUNCTION__ << std::endl;
          sock->async_read_some(boost::asio::buffer(buff->data(), buff->size()), std::bind(std::mem_fn(&Client::OnRecv), this, std::placeholders::_1, std::placeholders::_2));
        }
      }

      void OnRecv(const boost::system::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __FUNCTION__ << std::endl;
        if (ec) {
          m_stdout.lock();
          std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
          m_stdout.unlock();
        } else {
          m_stdout.lock();
          std::cout << buff->data() << std::endl;
          m_stdout.unlock();
        }
      }
    };

    int main () {
      std::shared_ptr<boost::asio::io_context> io_context(std::make_shared<boost::asio::io_context>());
      std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard(
        std::make_shared<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> (boost::asio::make_work_guard(*io_context))
      );
      MyWorker mw(io_context, work_guard);
      std::vector<std::thread> vth;
      vth.reserve(1);
      for (int i = 1; i > 0; --i) {
        vth.emplace_back(mw);
      }
      std::shared_ptr<Client> cl = 0;
      try {
        boost::asio::ip::tcp::resolver resolver(*io_context);
        boost::asio::ip::tcp::resolver::query query("unegare.info", "80");
        boost::asio::ip::tcp::endpoint ep = *resolver.resolve(query);
        m_stdout.lock();
        std::cout << "ep: " << ep << std::endl;
        m_stdout.unlock();

        std::shared_ptr<boost::asio::ip::tcp::socket> sock(std::make_shared<boost::asio::ip::tcp::socket>(*io_context));
        std::shared_ptr<Client> cl2(std::make_shared<Client>(io_context, work_guard, sock));
        cl = cl2->shared_from_this();
        m_stdout.lock();
        std::cout << "HERE: use_count = " << cl.use_count() << std::endl;
        m_stdout.unlock();
        sock->async_connect(ep, std::bind(std::mem_fn(&Client::OnConnect), *cl2->shared_from_this(), std::placeholders::_1));
        std::this_thread::sleep_for(std::chrono::duration<double>(1));
        m_stdout.lock();
        std::cout << "AFTER CALL" << std::endl;
        m_stdout.unlock();
    //    asm volatile ("");
      } catch (std::exception &ex) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] Main Thread: caught an exception: " << ex.what() << std::endl;
        m_stdout.unlock();
      }
      try {
        char t;
        std::cin >> t;
        work_guard->reset();
    //    std::this_thread::sleep_for(std::chrono::duration<double>(1));
    //    std::cout << "Running" << std::endl;
    //    io_context->run();
      } catch (std::exception &ex) {
        m_stdout.lock();
        std::cout << "[" << std::this_thread::get_id() << "] Main Thread: caught an exception: " << ex.what() << std::endl;
        m_stdout.unlock();
      }
      std::for_each(vth.begin(), vth.end(), std::mem_fn(&std::thread::join));
      return 0;
    }

stdout:

[140487203505984] MyWorker copy constructor
[140487203505984] MyWorker move constructor
[140487185372928] Thread Start
ep: 95.165.130.37:80
[140487203505984] Client with args
HERE: use_count = 2
[140487203505984] Client copy
[140487203505984] Client move
[140487203505984] ~Client buff.use_count: 0 | sock.use_count: 0 | io_context.use_count: 0
[140487185372928] Client move
[140487185372928] ~Client buff.use_count: 0 | sock.use_count: 0 | io_context.use_count: 0
OnConnect
GET / HTTP/1.1
Host: unegare.info


OnConnect use_count: 2
[140487185372928] ~Client buff.use_count: 2 | sock.use_count: 3 | io_context.use_count: 5
Segmentation Fault (core dumped)

But there is a little problem with the understanding of the segfault caused by the bad reference to the Client object.

But I do not understand why cl2 becomes destructed after the call of

sock->async_connect(ep, std::bind(std::mem_fn(&Client::OnConnect), *cl2->shared_from_this(), std::placeholders::_1));

at the 162 line.

As well as ... Why was there an invocation of the copy constructor? as it may be noticed from the stdout above.

unegare
  • 2,197
  • 1
  • 11
  • 25

2 Answers2

4

You don’t track destruction of cl2 with you output: cl2 is std::shared_ptr<Client>. You seem to track construction of Client objects.

You problem is the * in front of cl2->make_shared_from_this(): that will dereference the std::shared_ptr<Client>. The bind() expression sees a Client& and captures a Client by copy. Removing the * should fix the problem. I haven’t fully understood was code is trying to do (I’m reading it on a phone) but I guess you actually want to capture the std::shared_ptr<Client> rather than the Client.

Also, as cl2 is already a std::shared_ptr<Client> there is no point in calling make_shared_from_this() on the pointed to object. It just recreates an unnecessary st::shared_ptr<Client>.

Dietmar Kühl
  • 150,225
  • 13
  • 225
  • 380
  • thank you! But I hadn't previously known the ability to pass `std::shared_ptr` as the first argument for a class-member function to `std::bind`. It's something new to me. – unegare Dec 22 '19 at 22:07
  • 1
    @dronte7: `std::bind(...)` is specified to use `std::invoke(...)` (I think this is the name of the function which deals with the pointer-to-member special case for the function parameter) which can deal with any copyable pointer-like type. It is similar to `std::mem_fn(...)` in that respect which also specified in terms of `std::invoke(...)`. – Dietmar Kühl Dec 22 '19 at 22:12
  • Hehe. There's the short answer school. And then there's my take on things :) I think there were a number of other issues worth pointing out though. – sehe Dec 22 '19 at 22:39
  • 2
    @sehe: there are definitely more things which need fixing in this code snippet. However, doing a full code review on a mobile phone is kind of challenging... – Dietmar Kühl Dec 22 '19 at 22:43
  • Holy. I don't even try to post anything on SO from my phone :) – sehe Dec 23 '19 at 21:21
  • @DietmarKühl Could you give hints about 'other things' to improve, please? – unegare Dec 24 '19 at 12:04
4

It's good that you try to understand. However, start simple!

  1. your copy constructor fails to call the base-class copy constructor (!!) oops
  2. your binds bind to
    • this (which does NOT keep the shared pointer alive)
    • *cl2->shared_from_this() - oops this binds to a COPY of *cl2 by value¹. That is obviously the reason why that COPY is destructed when you're done

The invalid reference arose from the combination of 1. and 2.b.

I'd suggest to simplify. A lot!

  1. Use Rule Of Zero (only declare special members when you need. In this, you introduced a bug because you did something wrong manually writing a copy-constructor that you didn't need)
  2. Use non-owning references where appropriate (not everything needs to be a smart pointer)
  3. Prefer std::unique_ptr for things that do not require shared ownership (shared ownership should be really rare)

  4. Prefer std::lock_guard instead of manually locking and unlocking (that's not exception safe)

  5. Many others: work_guard doesn't need to be copied, it has a reset() member already, catch by const-reference, if you're gonna catch, don't need to use error_code on io_context::run, check the end-point iterator of resolve before dereference, use boost::asio::connect instead so you don't have to check and iterate over different endpoints etc.
  6. prefer std::string over fixed-size buffers if you're doing dynamic allocation anyways, use non-implementation defined chrono durations (for example 1.0s, not duration<double>(1)), consider using boost::thread_group instead of vector<std::thread> or at least only joining threads that are joinable() etc
  7. Suggest to make that async_connect part of Client, since then you can simply bind members the same way as all the rest of the binds (using shared_from_this(), instead of writing the bug you had)
  8. If you have time left, consider using Beast for the Http request and response creation/parsing

Just as a reference, this is what MyWorker could be:

class MyWorker {
    ba::io_context& io_context;

  public:
    MyWorker(ba::io_context& _io_context) : io_context(_io_context) {}

    //// best is to not mention these at all, because even `default`ing can change what compiler generates
    //MyWorker(const MyWorker& mw) = default;
    //MyWorker(MyWorker&& mw) = default;
    //~MyWorker() = default;

    void operator()() {
        TRACE("Thread Start");

        while (true) {
            try {
                io_context.run();
                break; // normal end
            } catch (boost::system::system_error const& se) {
                TRACE("MyWorker: received an error: " << se.code().message());
            } catch (std::exception const& ex) {
                TRACE("MyWorker: caught an exception: " << ex.what());
            }
        }

        TRACE("Thread Finish");
    }
};

At this point, you could very well just make it a lambda:

auto mw = [&io_context] {
        TRACE("Thread Start");

        while (true) {
            try {
                io_context.run();
                break; // normal end
            } catch (boost::system::system_error const& se) {
                TRACE("MyWorker: received an error: " << se.code().message());
            } catch (std::exception const& ex) {
                TRACE("MyWorker: caught an exception: " << ex.what());
            }
        }

        TRACE("Thread Finish");
    };

Much simpler.

The Full Code Simplified

Just see e.g. the simplicity of the new connection code:

void Start() {
    tcp::resolver resolver(io_context);
    ba::async_connect(sock,
        resolver.resolve({"unegare.info", "80"}),
        [self=shared_from_this()](error_code ec, tcp::endpoint) { self->OnConnect(ec); });
}

The endpoint is printed when OnConnect is called.

Live On Coliru²

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
#include <mutex>

namespace ba = boost::asio;
using ba::ip::tcp;
using namespace std::literals;

static std::mutex s_stdout;
#define TRACE(expr) { \
    std::lock_guard<std::mutex> lk(s_stdout); \
    std::cout << "[" << std::this_thread::get_id() << "] " << expr << std::endl; \
}

class Client : public std::enable_shared_from_this<Client> {
    ba::io_context& io_context;
    tcp::socket sock;
    std::string buf;

  public:
    Client(ba::io_context& _io_context) : io_context(_io_context), sock{io_context}
    { }

    void Start() {
        tcp::resolver resolver(io_context);
        ba::async_connect(sock,
                resolver.resolve({"unegare.info", "80"}),
                std::bind(std::mem_fn(&Client::OnConnect), shared_from_this(), std::placeholders::_1));
    }

    void OnConnect(const boost::system::error_code& ec) {
        TRACE(__FUNCTION__ << " ep:" << sock.remote_endpoint());
        if (ec) {
            TRACE(__FUNCTION__ << ": " << ec.message());
        } else {
            buf = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
            TRACE(std::quoted(buf));
            sock.async_write_some(ba::buffer(buf),
               std::bind(std::mem_fn(&Client::OnSend), shared_from_this(), std::placeholders::_1, std::placeholders::_2));
        }
    }

    void OnSend(const boost::system::error_code& ec, std::size_t bytes_transferred) {
        if (ec) {
            TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
        } else {
            TRACE(__FUNCTION__);
            buf.assign(512, '\0');
            sock.async_read_some(ba::buffer(buf), std::bind(std::mem_fn(&Client::OnRecv), shared_from_this(), std::placeholders::_1, std::placeholders::_2));
        }
    }

    void OnRecv(const boost::system::error_code& ec, std::size_t bytes_transferred) {
        TRACE(__FUNCTION__);
        if (ec) {
            TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
        } else {
            buf.resize(bytes_transferred);
            TRACE(std::quoted(buf));
        }
    }
};

int main() {
    ba::io_context io_context;
    auto work_guard = make_work_guard(io_context);

    boost::thread_group vth;

    auto mw = [&io_context] {
        TRACE("Thread Start");

        while (true) {
            try {
                io_context.run();
                break; // normal end
            } catch (boost::system::system_error const& se) {
                TRACE("MyWorker: received an error: " << se.code().message());
            } catch (std::exception const& ex) {
                TRACE("MyWorker: caught an exception: " << ex.what());
            }
        }

        TRACE("Thread Finish");
    };

    vth.create_thread(mw);

    try {
        std::make_shared<Client>(io_context)->Start();

        char t;
        std::cin >> t;
        work_guard.reset();
    } catch (std::exception const& ex) {
        TRACE("Main Thread: caught an exception: " << ex.what());
    }

    vth.join_all();
}

Prints:

[140095938852608] Thread Start
[140095938852608] OnConnect ep:95.165.130.37:80
[140095938852608] "GET / HTTP/1.1
Host: unegare.info

"
[140095938852608] OnSend
[140095938852608] OnRecv
[140095938852608] "HTTP/1.1 200 OK
Date: Sun, 22 Dec 2019 22:26:56 GMT
Server: Apache/2.4.18 (Ubuntu)
Last-Modified: Sun, 10 Mar 2019 10:17:38 GMT
ETag: \"37-583bac3f3c843\"
Accept-Ranges: bytes
Content-Length: 55
Content-Type: text/html

<html>
<head>
</head>
<body>
It works.
</body>
</html>
"
q
[140095938852608] Thread Finish

BONUS

std::bind is obsolete since C++11, consider using lambdas instead. Since Coliru didn't want to co-operate any more, I'll just post the three changed functions in full:

void Start() {
    tcp::resolver resolver(io_context);
    ba::async_connect(sock,
        resolver.resolve({"unegare.info", "80"}),
        [self=shared_from_this()](error_code ec, tcp::endpoint) { self->OnConnect(ec); });
}

void OnConnect(const boost::system::error_code& ec) {
    TRACE(__FUNCTION__ << " ep:" << sock.remote_endpoint());
    if (ec) {
        TRACE(__FUNCTION__ << ": " << ec.message());
    } else {
        buf = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
        TRACE(std::quoted(buf));
        sock.async_write_some(ba::buffer(buf),
            [self=shared_from_this()](error_code ec, size_t bytes_transferred) { self->OnSend(ec, bytes_transferred); });
    }
}

void OnSend(const boost::system::error_code& ec, std::size_t bytes_transferred) {
    if (ec) {
        TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
    } else {
        TRACE(__FUNCTION__);
        buf.assign(512, '\0');
        sock.async_read_some(ba::buffer(buf), 
            [self=shared_from_this()](error_code ec, size_t bytes_transferred) { self->OnRecv(ec, bytes_transferred); });
    }
}

¹ Does boost::bind() copy parameters by reference or by value?

² Coliru doesn't allow network access

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Coliru seems to have stopped working, so here's the lambda-ized bonus on Wandbox: https://wandbox.org/permlink/dyQOZYTyEAwSxwHd – sehe Dec 22 '19 at 23:04
  • Thank you very much for such a descriptive answer! But I'd like to ask about two more things. The first one is about the `error: use of deleted function ‘boost::asio::io_context::io_context(const boost::asio::io_context&)’ ` compilation error (Boost 1.72), so it seems imposible to use `Client(...): io_context(_io_context)`. And the second one: What does it mean `sock{io_context}`? Making use of curly brackets? As struct? It seems to be as unproper dependency on `sock`'s implementation, isn't it? – unegare Dec 23 '19 at 11:30
  • 1
    As you can see, I had the code live on online compilers. You should see what you had differently (I can guess that you forgot the `&` in the member declaration). – sehe Dec 23 '19 at 21:18
  • 1
    The `sock{io_context}` is "uniform initialization syntax" (since c++11) (a kind of popular misnomer), which is actually just calling the constructor like `sock(io_context)`. See also https://stackoverflow.com/questions/18222926/why-is-list-initialization-using-curly-braces-better-than-the-alternatives – sehe Dec 23 '19 at 21:20
  • Yes, I lost the `&` in the member declaration, but is it a good idea to keep an external variable as a pointer-to? In general? For me, without owning, it sounds very strange. Isn't it a violation of some common architectural patterns? – unegare Dec 24 '19 at 08:11
  • 1
    @dronte7: Instead of `lock()`/``unlock()` use `lock_guard` or `unique_lock`; `endl` is always wrong: use `’\n’` and if yiu really want the stream to be flushed `flush`; types with copy/move ctor should have corresponding assignments; I don’t see a place for global objects, not even to protect existing global objects - at least, wrap inside a function to guarantee it is constructed before first use; `catch` by `const&` (unless the exception is rethrown after modification); the code block logging special members is repeated multiple times: encapsulate and call with capturing the reason; ... – Dietmar Kühl Dec 24 '19 at 22:24
  • 1
    @dronte7: ... I don’t see any reason for the loop or its preceding `reserve()`; instead of reading a `char` I’d use `cin.ignore()`; I’d use `jthread` (C++20) or a class derived from `thread` which `join()`s in its destructor; I haven’t used Boost ASIO but I believe its executors are cheap to copy and there isn’t a need to use `shared_ptr` for these but I’m not an expert in that to tell for sure. Fixing these issues (and those sehe mentioned) may reveal additional suspicious code. – Dietmar Kühl Dec 24 '19 at 22:32