I have implemented a udp session using a multi-threaded environment.
using RawDataArray=std::array <unsigned char,65000>;
class StaticBuffer
{
private:
RawDataArray m_data;
std::size_t m_n_avail;
public:
StaticBuffer():m_data(),m_n_avail(0){}
StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
StaticBuffer(const StaticBuffer& other)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=other.m_n_avail;
}
StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=n_bytes;
}
StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
{
std::cout<<"ctor static buff\n";
m_data=data;
m_n_avail=n_bytes;
}
void set_size(int n)
{
m_n_avail=n;
}
void set_max_size(){m_n_avail=m_data.size();}
std::size_t max_size()const {return m_data.size();}
unsigned char& operator[](std::size_t i){return m_data[i];}
const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
StaticBuffer& operator=(const StaticBuffer& other)
{
if (this == &other)
return *this;
m_data = other.m_data;
m_n_avail = other.m_n_avail;
return *this;
}
void push_back(unsigned char val)
{
if (m_n_avail<m_data.size())
{
m_data[m_n_avail]=val;
}else
throw "Out of memory";
}
void reset(){m_n_avail=0;}
unsigned char* data(){return m_data.data();}
const unsigned char* data()const {return m_data.data();}
std::size_t size()const{return m_n_avail;}
~StaticBuffer(){}
};
class UDPSeassion;
using DataBuffer = StaticBuffer;
using DataBufferPtr=std::unique_ptr<DataBuffer>;
using ExternalReadHandler=std::function<void(DataBufferPtr)>;
class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
{
private:
asio::io_context& m_ctx;
asio::ip::udp::socket m_socket;
asio::ip::udp::endpoint m_endpoint;
std::string m_addr;
unsigned short m_port;
asio::io_context::strand m_send_strand;
std::deque<DataBufferPtr> m_dq_send;
asio::io_context::strand m_rcv_strand;
DataBufferPtr m_rcv_data;
ExternalReadHandler external_rcv_handler;
private:
void do_send_data_from_dq()
{
if (m_dq_send.empty())
return;
m_socket.async_send_to(
asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
m_endpoint,
asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
if (!er)
{
m_dq_send.pop_front();
do_send_data_from_dq();
}else
{
//post to loggger
}
}));
}
void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
{
if (!er)
{
m_rcv_data->set_size(bytes_transferred);
asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
m_rcv_data=std::make_unique<DataBuffer>();
m_rcv_data->set_max_size();
async_read();
}
}
public:
UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
m_ctx(ctx),
m_socket(ctx),
m_endpoint(asio::ip::address::from_string(addr),port),
m_addr(addr),
m_port(port),
m_send_strand(ctx),
m_dq_send(),
m_rcv_strand(ctx),
m_rcv_data(std::make_unique<DataBuffer>(65000))
{}
~UDPSeassion(){}
const std::string& get_host()const{return m_addr;}
unsigned short get_port(){return m_port;}
template<typename ExternalReadHandlerCallable>
void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
{
external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
}
void start()
{
m_socket.open(asio::ip::udp::v4());
async_read();
}
void async_read()
{
m_socket.async_receive_from(
asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
m_endpoint,
asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
);
}
void async_send(DataBufferPtr pData)
{
asio::post(m_ctx,
asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
m_dq_send.emplace_back(std::move(pDt));
if (m_dq_send.size()==1)
do_send_data_from_dq();
}));
}
};
void handler_read(DataBufferPtr pdata)
{
// decoding raw_data -> decod_data
// lock mutext
// queue.push_back(decod_data)
// unlock mutext
//for view pdata
std::stringstream ss;
ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
std::cout<<ss.str()<<std::endl;
}
int main()
{
asio::io_context ctx;
//auto work_guard = asio::make_work_guard(ctx);
std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
StaticBuffer b{4};
b[0]='A';
b[1]='B';
b[2]='C';
b[4]='\n';
UDPSeassion client(ctx,"127.0.0.1",11223);
client.set_read_data_headnler(handler_read);
client.start();
std::vector<std::thread> threads;
for (int i=0;i<3;++i)
{
threads.emplace_back([&](){
std::stringstream ss;
ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
std::cout<<ss.str();
ctx.run();
std::cout<<"end thread\n";
}
);
}
client.async_send(std::make_unique<StaticBuffer>(b));
ctx.run();
for (auto& t:threads)
t.join();
return 1;
}
in the code above, the main emphasis is on the UDPSeasion class. Class StaticBuffer is written so that it performs the main functions. I have some questions:
- Suppose that this class will be built into a system that works with a frequency of ~ 100 Hz. Every 10ms, the system will send its state through the client. 1.1 Is it properly done for a multi-threaded environment? How efficient is this implementation? 1.2 How efficient is a client implementation that contains only one thread within itself that serves reading and writing? example
- Is buffer transfer between tasks correct? (std::move(unique_ptr_data))
- In practice, how many threads are given to the client to process reads and writes?
- For TCP client?
I will be very grateful for detailed answers to my questions.Thank you very much))