1

I have implemented a udp session using a multi-threaded environment.

    using RawDataArray=std::array <unsigned char,65000>;
    
    class StaticBuffer
    {
    private:
        RawDataArray                            m_data;
        std::size_t                             m_n_avail;
    public:
    
        StaticBuffer():m_data(),m_n_avail(0){}
        StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
        StaticBuffer(const StaticBuffer& other)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=other.m_n_avail;
        }
        StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=n_bytes;
        }
        StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
        {
            std::cout<<"ctor static buff\n";
            m_data=data;
            m_n_avail=n_bytes;
        }
        void set_size(int n)
        {
            m_n_avail=n;
        }
        void set_max_size(){m_n_avail=m_data.size();}
        std::size_t max_size()const {return m_data.size();}
        unsigned char& operator[](std::size_t i){return m_data[i];}
        const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
        StaticBuffer& operator=(const StaticBuffer& other)
        {
            if (this == &other)
                return *this;
            m_data = other.m_data;
            m_n_avail = other.m_n_avail;
            return *this;
        }
        void push_back(unsigned char val)
        {
            if (m_n_avail<m_data.size())
            {
                m_data[m_n_avail]=val;
            }else
                throw "Out of memory";
        }
        void reset(){m_n_avail=0;}
    
        unsigned char* data(){return m_data.data();}
        const unsigned char* data()const {return m_data.data();}
        std::size_t size()const{return m_n_avail;}
    
        ~StaticBuffer(){}
    };
    
        class UDPSeassion;
        using DataBuffer = StaticBuffer;
        using DataBufferPtr=std::unique_ptr<DataBuffer>;
        using ExternalReadHandler=std::function<void(DataBufferPtr)>;
        
        class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
        {
        private:
            asio::io_context&           m_ctx;
            asio::ip::udp::socket       m_socket;
            asio::ip::udp::endpoint     m_endpoint;
            std::string                 m_addr;
            unsigned short              m_port;
        
            asio::io_context::strand    m_send_strand;
            std::deque<DataBufferPtr>   m_dq_send;
        
        
            asio::io_context::strand    m_rcv_strand;
            DataBufferPtr               m_rcv_data;
        
            ExternalReadHandler         external_rcv_handler;
        
        private:
            void do_send_data_from_dq()
            {
                if (m_dq_send.empty())
                    return;
        
                m_socket.async_send_to(
                            asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
                            m_endpoint,
                            asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
                    if (!er)
                    {
                        m_dq_send.pop_front();
                        do_send_data_from_dq();
        
                    }else
                    {
                        //post to loggger
                    }
                }));
            }
        
            void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
            {
                if (!er)
                {
                    m_rcv_data->set_size(bytes_transferred);
                    asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
                    m_rcv_data=std::make_unique<DataBuffer>();
                    m_rcv_data->set_max_size();
                    async_read();
                }
            }
        
        public:
        
            UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
                m_ctx(ctx),
                m_socket(ctx),
                m_endpoint(asio::ip::address::from_string(addr),port),
                m_addr(addr),
                m_port(port),
                m_send_strand(ctx),
                m_dq_send(),
                m_rcv_strand(ctx),
                m_rcv_data(std::make_unique<DataBuffer>(65000))
        
            {}
            ~UDPSeassion(){}
        
            const std::string& get_host()const{return m_addr;}
            unsigned short get_port(){return m_port;}
            template<typename ExternalReadHandlerCallable>
            void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
            {
                external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
            }
            void start()
            {
                m_socket.open(asio::ip::udp::v4());
                async_read();
            }
        
            void async_read()
            {
                m_socket.async_receive_from(
                            asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
                            m_endpoint,
                            asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
                            );
            }
        
            void async_send(DataBufferPtr pData)
            {
                asio::post(m_ctx,
                           asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
                                                                                                m_dq_send.emplace_back(std::move(pDt));
                                                                                                if (m_dq_send.size()==1)
                                                                                                    do_send_data_from_dq();
                                                                                                }));
            }
        };

void handler_read(DataBufferPtr pdata)
{
    // decoding raw_data -> decod_data
    // lock mutext
    // queue.push_back(decod_data)
    // unlock mutext

    //for view pdata
    std::stringstream ss;
    ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
    std::cout<<ss.str()<<std::endl;
}
int main()
{
    asio::io_context ctx;
    //auto work_guard = asio::make_work_guard(ctx);
    std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
    StaticBuffer b{4};
    b[0]='A';
    b[1]='B';
    b[2]='C';
    b[4]='\n';

    UDPSeassion client(ctx,"127.0.0.1",11223);
    client.set_read_data_headnler(handler_read);
    client.start();

    std::vector<std::thread> threads;

    for (int i=0;i<3;++i)
    {
        threads.emplace_back([&](){
            std::stringstream ss;
            ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
            std::cout<<ss.str();
            ctx.run();
            std::cout<<"end thread\n";
        }
        );
    }

    client.async_send(std::make_unique<StaticBuffer>(b));
    ctx.run();

    for (auto& t:threads)
        t.join();

    return 1;
}

in the code above, the main emphasis is on the UDPSeasion class. Class StaticBuffer is written so that it performs the main functions. I have some questions:

  1. Suppose that this class will be built into a system that works with a frequency of ~ 100 Hz. Every 10ms, the system will send its state through the client. 1.1 Is it properly done for a multi-threaded environment? How efficient is this implementation? 1.2 How efficient is a client implementation that contains only one thread within itself that serves reading and writing? example
  2. Is buffer transfer between tasks correct? (std::move(unique_ptr_data))
  3. In practice, how many threads are given to the client to process reads and writes?
  4. For TCP client?

I will be very grateful for detailed answers to my questions.Thank you very much))

Lucifer
  • 53
  • 4
  • 1
    "How efficient is a client implementation that contains only one thread within itself that serves reading and writing?" - it can be extremely efficient. In fact it can avoid most overhead. If your process is IO-bound it's likely you won't need more. – sehe Aug 04 '23 at 17:56

1 Answers1

2

I'd simplify a ton.

  • you "use" enable_shared_from_this but none of the asynchronous operations capture shared_from_this. In fact, you don't even allocate UDPSession shared, so it would be Undefined Behaviour to use shared_from_this at all.

  • no-op destructors are implied. If you must declare them, = default them

  • the m_rcv_strand is deprecated - use strand<> instead

  • why is there a separate strand for send/receive? Sure, 1 read operation is allowed to overlap 1 write operations, but you still cannot access the shared objects (like m_socket) without proper synchronization

  • you have strands but seem to erronously not post to them where relevant (e.g. post(m_ctx, bind_executor(m_send_strand, ....)) is conflicting)

  • you have a laborious buffer type that /seemingly/ aims to avoid allocation, but you wrap it in a unique_ptr anyways ¯\(ツ)

  • set_read_data_handler doesn't need to be a template. Since you're erasing to std::function anyways, there's zero benefit over just using:

     void set_read_data_handler(ExternalReadHandler handler) {
          external_rcv_handler = std::move(handler);
     }
    
  • You have repeated magic constants (e.g. 65000)

  • You seem to be missing a socket bind() call

In short I'd replace the buffer with something sensible:

using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);

Since you seem to expect text protocol, chances are your average message is (much) smaller, so I reckon it may be much faster to just use std::string or even boost::container::small_vector<...>.

Not really required but to allow for elegant, asio-standard use:

using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }

See a much simplified version Live On Coliru

#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);

// not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }

using ExternalReadHandler = std::function<void(StaticBuffer&&)>;

class UDPSession {
  private:
    using error_code = boost::system::error_code;
    asio::any_io_executor m_ex;
    std::string           m_addr;
    uint16_t              m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    udp::socket   m_socket{make_strand(m_ex)};

    std::deque<StaticBuffer> m_dq_send;
    StaticBuffer             m_rcv_data;
    ExternalReadHandler      external_rcv_handler;

  public:
    UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(StaticBuffer data) {
        asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
        m_socket.async_receive_from(
            buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            m_rcv_data.resize(bytes_transferred);
            asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
                external_rcv_handler(std::move(data));
            });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
                               [this](error_code er, size_t /*bytes_transferred*/) {
                                   if (!er) {
                                       m_dq_send.pop_front();
                                       send_loop();
                                   } // else { /* post to loggger */ }
                               });
    }
};

void handler_read(StaticBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    asio::io_context ctx;
    auto work_guard = asio::make_work_guard(ctx);

    trace("Main thread");

    std::list<std::thread> threads;

    for (int i = 0; i < 3; ++i)
        threads.emplace_back([&]() {
            trace("START");
            ctx.run();
            trace("END");
        });

    UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
    client.set_read_data_handler(handler_read);
    client.start();
    client.send({'A', 'B', 'C', '\n'});

    work_guard.reset();

    for (auto& t : threads)
        t.join();
}

The live demo on Coliru "eats" words from main.cpp. Here's a local dictionary demo:

enter image description here

EXTRA: Thread Pool, shared_from_this

You might have noticed I changed to any_io_executor instead of io_context&. That way you can easily switch to asio::thread_pool instead of doing it manually (poorly¹).

Let's also re-instate shared_from_this, but correctly this time.

For simplicity I've used a static buffer ONLY for the receive buffer (because that's how datagram protocols roll), and just used vector (or small_vector) for the DataBuffer.

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;

class UDPSession : public std::enable_shared_from_this<UDPSession> {
  private:
    using error_code = boost::system::error_code;
    asio::any_io_executor m_ex;
    std::string           m_addr;
    uint16_t              m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    udp::socket   m_socket{make_strand(m_ex)};

    std::deque<DataBuffer>     m_dq_send;
    std::array<uint8_t, 65000> m_rcv_data;
    ExternalReadHandler        external_rcv_handler;

  public:
    UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(DataBuffer data) {
        asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        using namespace std::placeholders;
        m_socket.async_receive_from( //
            asio::buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            asio::post(
                m_ex,
                [this, self=shared_from_this(),
                 data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
                    external_rcv_handler(std::move(data));
                });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to( //
            asio::buffer(m_dq_send.front()), m_endpoint,
            [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                if (!er) {
                    m_dq_send.pop_front();
                    send_loop();
                } // else { /* post to loggger */ }
            });
    }
};

void handler_read(DataBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    trace("Main thread");

    asio::thread_pool ctx(4);

    {
        auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
        client->set_read_data_handler(handler_read);
        client->start();
        client->send({'A', 'B', 'C', '\n'});
    } // client stays alive through shared ownership

    ctx.join();
}

As icing on the cake, you can template the entire thing on the concrete executor type and avoid type-erasing the executor type:

template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
    using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;

See it Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>

namespace { // user-friendly logging
    static std::mutex      s_console_mx;
    static std::atomic_int t_id_gen = 0;
    thread_local int       t_id     = ++t_id_gen;

    template <typename... T> static inline void trace(T const&... args) {
        std::lock_guard lk(s_console_mx);
        ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
    }
} // namespace

namespace asio = boost::asio;
using asio::ip::udp;

//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;

template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
    using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
    using error_code = boost::system::error_code;
    Executor    m_ex;
    std::string m_addr;
    uint16_t    m_port;

    udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
    socket_t      m_socket{make_strand(m_ex)};

    std::deque<DataBuffer>     m_dq_send;
    std::array<uint8_t, 65000> m_rcv_data;
    ExternalReadHandler        external_rcv_handler;

    using std::enable_shared_from_this<UDPSession>::shared_from_this;
  public:
    UDPSession(Executor ex, std::string const& addr, uint16_t port)
        : m_ex(ex)
        , m_addr(addr)
        , m_port(port) {}

    std::string const& get_host() const { return m_addr; }
    uint16_t           get_port() { return m_port; }
    void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }

    void start() {
        m_socket.open(udp::v4());
        m_socket.bind(m_endpoint);
        do_read();
    }

    void send(DataBuffer data) {
        asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
            m_dq_send.emplace_back(std::move(d));
            if (m_dq_send.size() == 1)
                send_loop();
        });
    }

  private:
    void do_read() {
        using namespace std::placeholders;
        m_socket.async_receive_from( //
            asio::buffer(m_rcv_data), m_endpoint,
            std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
    }

    void on_read(error_code er, size_t bytes_transferred) {
        if (!er) {
            auto f = m_rcv_data.data(), l = f + bytes_transferred;
            asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
                self->external_rcv_handler(std::move(data));
            });
            do_read();
        }
    }

    void send_loop() {
        if (m_dq_send.empty())
            return;

        m_socket.async_send_to( //
            asio::buffer(m_dq_send.front()), m_endpoint,
            [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                if (!er) {
                    m_dq_send.pop_front();
                    send_loop();
                } // else { /* post to loggger */ }
            });
    }
};

void handler_read(DataBuffer&& pdata) {
    if (!pdata.empty()) {
        std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
        trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
    }
}

int main() {
    trace("Main thread");

    using Ex = asio::thread_pool::executor_type;
    asio::thread_pool ctx(4);

    {
        auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
        client->set_read_data_handler(handler_read);
        client->start();
        client->send({'A', 'B', 'C', '\n'});
    } // client stays alive through shared ownership

    ctx.join();
}

Another local demo:

enter image description here


¹ you needed at least exception handling in the runner threads: Should the exception thrown by boost::asio::io_service::run() be caught?

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thanks a lot for the broad answer. It seems to me that this example should be added to the library documentation. I have some questions 1. I thought it was thread safe to write to and read from a socket from different threads, but not safe to write (read) from different threads at the same time. 2. If I want to use my own data buffer, can I use a smart pointer to pass the buffer between tasks on different threads? 3. I wanted to remove the code from the std::function. I added a template to allow the compiler to construct the type. I think it's better to use a template for a handler – Lucifer Aug 05 '23 at 11:13
  • 1. Yes but look at how I worded that in my answer. Distinct IO objects are not threadsafe. 2. Sure, just be aware of the potential cost. I think for the pattern shown in the code my simpler approach will usually be more efficient. 3. Granted. It *can* sometimes be more efficient. – sehe Aug 07 '23 at 21:01