1

I wanna make multi client Server using boost::asio and boost::thread_group.

I tried this

//Listener.h 
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <memory>
#include <boost/asio/io_context.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/thread/thread.hpp>
#include "Session.h"
#define PORT_NUMBER 12880

class Listener
{
public:
    Listener(boost::asio::io_context& iosvc, boost::asio::ip::tcp::endpoint& ep);

    void Start_accept();


private:
    boost::asio::io_context &_iosvc;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::thread_group threadPool;
    void OnAcceptComplete(const boost::system::error_code& error, Session* session);
};

and this is Listener.cpp file

//Listener.cpp
#pragma once
#include <iostream>
#include "Listener.h"
#include "SessionManager.h"

Listener::Listener(boost::asio::io_context& iosvc, boost::asio::ip::tcp::endpoint& ep) :
    _acceptor(iosvc, ep), _iosvc(iosvc) {}


void Listener::Start_accept()
{
    //Create Client Session
    Session* new_session = clientsession::SessionManager::GetInstance()->GenerateSession(_iosvc);
    std::cout << "Listener thread ID : " << std::this_thread::get_id << '\n';
    std::cout << "Listening....." << "\n\n";
    _acceptor.async_accept(*(new_session->Socket()),
        boost::bind(&Listener::OnAcceptComplete, this, boost::asio::placeholders::error, new_session));
}
void Listener::OnAcceptComplete(const boost::system::error_code& error, Session* session)
{
    if (!error) 
    {
        std::cout << "accept completed" << '\n';

        threadPool.create_thread(boost::bind(&boost::asio::io_context::run, &(session->_iocontext)));
        session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
        
        threadPool.join_all();
    }

    else
    {
        std::cout << error.message() << '\n';
    }
    Start_accept();
}

but session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->': Cannot convert from'T' to'Session *'.

The main statement of Server is as follows.

//Server.cpp
#include <iostream>
#include <boost/thread/thread.hpp>
#include "Listener.h"
#include "SessionManager.h"
using namespace boost::asio;
using namespace clientsession;
SessionManager* _sessionManager;

int main()
{
    try
    {
        boost::asio::io_context iocontext;
        ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), PORT_NUMBER); //ipv4, 포트번호
        Listener* _listener = new Listener(iocontext, ep);

        _listener->Start_accept();
        iocontext.run();

    }
    catch (std::exception& e)
    {
        std::cout << e.what() << '\n';
    }
    while (true)
    {
    }
    return 0;

}

I wanna make multithread Server [ one io_context Listener(include accept), multiple io_context Session( for client)]

In other words I want to change the structure in which session threads are created for each client. I don't know if the code I created is correct.

N:1 communication is possible, but session processing works synchronously. So I want to make a thread a session and handle it asynchronously.

dbush
  • 205,898
  • 23
  • 218
  • 273
Lucius__
  • 41
  • 4

1 Answers1

2

In other words I want to change the structure in which session threads are created for each client.

This is an anti-pattern in ASIO. You can, but read this: https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/overview/core/threads.html

One natural way to have separate threads in Asio is to have an io_context per thread as well. However, a much more natural way would be to have a strand ("logical thread") per client and a fixed thread pool that reflects the true number of cores on your system.

I don't know if the code I created is correct.

Actually you do, because it doesn't compile.

    session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.

this is Listener* here. Of course you cannot bind the method of Session to an instance of the Listener class. You wanted the session instead:

post(session->_iocontext, boost::bind(&Session::StartSession, session));

Next up:

  • threadPool.join_all(); appears in OnAcceptedComplete meaning that you'll literally await all threads before even considering accepting any other connection. This means that you're in fact better off with a single threaded, blocking server.

  • you need to INVOKE std::this_thread::get_id: add parentheses std::this_thread::get_id() (reminded me of node addon with cluster get process id returns same id for all forked process's)

  • All your raw pointers and new without delete is going to lead to a nightmare with memory leaks or stale pointers.

    • why is listener not on the stack?

        Listener _listener(iocontext, ep);
        _listener.Start_accept();
      
    • why would Socket() not return a reference? Now you only use it like

        tcp::socket _socket;
        auto* Socket() { return &_socket; }
        // ...
        _acceptor.async_accept(*(new_session->Socket()),
      

      Oof. Simpler:

        auto& Socket() { return _socket; }
        // ....
        _acceptor.async_accept(new_session->Socket(),
      
    • etc.

  • You post StartSession onto the io service:

     post(/*some service*/, boost::bind(&Session::StartSession, session));
    

    Indeed this makes sense only with a private service for that thread or using a strand. With strands it seems redundant still because this always happens on a newly accepted session, so no other thread can possibly know about it.

    The exception is when you rely on thread local storage?

    Or maybe your implementation of GenerateSession does not result unique sessions?

Minimal Fixes

Here are my minimal fixes. I suggest NOT using multiple threads (you're doing asynchronous IO anyways).

#define PORT_NUMBER 12880
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
using boost::asio::ip::tcp;
using boost::system::error_code;

struct Session : std::enable_shared_from_this<Session> {
    template <typename Executor>
    Session(Executor executor) : _socket(make_strand(executor)) {}

    auto& Socket() { return _socket; }

    void StartSession() {
        std::cout << "Session: " << this << " StartSession" << std::endl;
        async_read(_socket, boost::asio::dynamic_buffer(received),
           [self = shared_from_this()](auto ec, size_t tx) { self->on_received(ec, tx); });
    };

  private:
    tcp::socket _socket;
    std::string received;
    void on_received(error_code ec, size_t /*bytes_transferred*/) {
        std::cout << "Session: " << this << " on_received " << ec.message() << " " << std::quoted(received) << std::endl;
    }
};

using SessionPtr = std::shared_ptr<Session>;

namespace clientsession {
    struct SessionManager {
        static SessionManager* GetInstance() {
            static SessionManager instance;
            return &instance;
        }
        template <typename... T> SessionPtr GenerateSession(T&&... args) {
            return std::make_shared<Session>(std::forward<T>(args)...);
        }
    };
} // namespace clientsession

class Listener {
    public:
        Listener(boost::asio::io_context& iosvc, tcp::endpoint ep)
            : _iosvc(iosvc), _acceptor(iosvc, ep) {}

        void Start_accept() {
            // Create Client Session
            auto new_session = clientsession::SessionManager::GetInstance()
                ->GenerateSession(_iosvc.get_executor());

            std::cout << "Listening.....\n\n";
            _acceptor.async_accept(new_session->Socket(),
                    boost::bind(&Listener::OnAcceptComplete, this,
                        boost::asio::placeholders::error,
                        new_session));
        }

    private:
        boost::asio::io_context& _iosvc;
        tcp::acceptor _acceptor;
        void OnAcceptComplete(error_code error, SessionPtr session) {
            if (!error) {
                std::cout << "accept completed\n";
                session->StartSession();
                Start_accept();
            } else {
                std::cout << error.message() << '\n';
            }
        }
};

int main() {
    try {
        boost::asio::io_context iocontext;
        Listener listener(iocontext, {{}, PORT_NUMBER}); // ipv4, port number
        listener.Start_accept();

        iocontext.run();
    } catch (std::exception& e) {
        std::cout << e.what() << '\n';
    }
}

When tested with a bunch of simultaneous clients:

for a in {1..10};
do
    (sleep 1.$RANDOM; echo -n "hellow world $RANDOM") |
         netcat -w 2 localhost 12880&
done; 
time wait

Prints out something like

Listening.....

accept completed
Session: 0x56093453c1b0 StartSession
Listening.....

accept completed
Session: 0x56093453de60 StartSession
Listening.....

accept completed
Session: 0x56093453e3c0 StartSession
Listening.....

accept completed
Session: 0x56093453e920 StartSession
Listening.....

accept completed
Session: 0x56093453ee80 StartSession
Listening.....

accept completed
Session: 0x56093453f3e0 StartSession
Listening.....

accept completed
Session: 0x56093453f940 StartSession
Listening.....

accept completed
Session: 0x56093453fea0 StartSession
Listening.....

accept completed
Session: 0x560934540400 StartSession
Listening.....

accept completed
Session: 0x560934540960 StartSession
Listening.....

Session: 0x56093453f940 on_received End of file "hellow world 10149"
Session: 0x56093453fea0 on_received End of file "hellow world 22492"
Session: 0x560934540400 on_received End of file "hellow world 29539"
Session: 0x56093453c1b0 on_received End of file "hellow world 20494"
Session: 0x56093453ee80 on_received End of file "hellow world 24735"
Session: 0x56093453de60 on_received End of file "hellow world 8071"
Session: 0x560934540960 on_received End of file "hellow world 27606"
Session: 0x56093453e920 on_received End of file "hellow world 534"
Session: 0x56093453e3c0 on_received End of file "hellow world 21676"
Session: 0x56093453f3e0 on_received End of file "hellow world 24362"
sehe
  • 374,641
  • 47
  • 450
  • 633
  • For some inspiration/alternatives to managing the Session lifetime(s) you can read the links here https://stackoverflow.com/a/65023952/85371 – sehe Jan 18 '21 at 16:30
  • Improved example to highlight that the sessions are simultaneous – sehe Jan 18 '21 at 18:50
  • Your sincere answer has helped me a lot. I also found "this -> Listener -> session" mistake :) I got a lot of inspiration. Thank you – Lucius__ Jan 19 '21 at 11:27