2

I'm new to C++ but so far most of the asio stuff has made sense. I am however stuggling to get my UDPServer working.

My question is possibly similar to: Trying to write UDP server class, io_context doesn't block

I think my UDPServer stops before work can be given to its io_context. However, I am issuing work to the context before calling io_context.run() so I don't understand why.

Of course, I am not entirely sure if I am even on the right track with the above statement and would appreciate some guidance. Here is my class:

template<typename message_T>
    class UDPServer
    {
    public:
        UDPServer(uint16_t port)
            : m_socket(m_asioContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), port))
        {
            m_port = port;
        }

        virtual ~UDPServer()
        {
            Stop();
        }



    public:

        // Starts the server!
        bool Start()
        {
            try
            {
                // Issue a task to the asio context
                WaitForMessages();

                m_threadContext = std::thread([this]() { m_asioContext.run(); });
            }
            catch (std::exception& e)
            {
                // Something prohibited the server from listening
                std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << e.what() << "\n";
                return false;
            }

            std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
            return true;
        }

        // Stops the server!
        void Stop()
        {
            // Request the context to close
            m_asioContext.stop();

            // Tidy up the context thread
            if (m_threadContext.joinable()) m_threadContext.join();

            // Inform someone, anybody, if they care...
            std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
        }


        void WaitForMessages()
        {

            m_socket.async_receive_from(asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
                [this](std::error_code ec, std::size_t length)
                {
                    if (!ec)
                    {
                        
                        std::cout << "[SERVER @ PORT " << m_port << "] Got " << length << " bytes \n Data: " << vBuffer.data() << "\n" << "Address: " << m_endpoint.address() << " Port: " << m_endpoint.port() << "\n" << "Data: " << m_endpoint.data() << "\n";

                    }
                    else
                    {
                        std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << ec.message() << "\n";
                        return;
                    }

                    WaitForMessages();
                }
            );
        }

        void Send(message_T& msg, const asio::ip::udp::endpoint& ep)
        {
            asio::post(m_asioContext,
                [this, msg, ep]()
                {
                    // If the queue has a message in it, then we must
                    // assume that it is in the process of asynchronously being written.
                    
                    bool bWritingMessage = !m_messagesOut.empty();
                    m_messagesOut.push_back(msg);
                    if (!bWritingMessage)
                    {
                        WriteMessage(ep);
                    }
                }
            );
        }

    private:

        void WriteMessage(const asio::ip::udp::endpoint& ep)
        {

            m_socket.async_send_to(asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
                [this, ep](std::error_code ec, std::size_t length)
                {

                    if (!ec)
                    {

                        m_messagesOut.pop_front();

                        // If the queue is not empty, there are more messages to send, so
                        // make this happen by issuing the task to send the next header.
                        if (!m_messagesOut.empty())
                        {
                            WriteMessage(ep);
                        }
                        
                    }
                    else
                    {
                        std::cout << "[SERVER @ PORT " << m_port << "] Write Header Fail.\n";
                        m_socket.close();
                    }
                });
        }

        void ReadMessage()
        {
        }

    private:
        
        uint16_t m_port = 0;
        asio::ip::udp::endpoint m_endpoint;
        std::vector<char> vBuffer = std::vector<char>(21);


    protected:
        TSQueue<message_T> m_messagesIn;
        TSQueue<message_T> m_messagesOut;
        Message<message_T> m_tempMessageBuf;
        asio::io_context m_asioContext;
        std::thread m_threadContext;
        asio::ip::udp::socket m_socket;
    };
}

Code is invoked in the main function for now:

enum class TestMsg {
    Ping,
    Join,
    Leave
};

int main() {
    Message<TestMsg> msg; // Message is a pretty basic struct that I'm not using yet. When I was, I was only receiving the first 4 bytes - which led me down this path of investigation
    msg.id = TestMsg::Join;
    msg << "hello";

    UDPServer<Message<TestMsg>> server(60000);
}

When invoked the Server immediately exits before it gets chance to print "[SERVER] Started"

enter image description here

I'll try adding the work guard as the link post describes but I would still like to understand why the io_context is not being primed with work quick enough.

JTInfinite
  • 360
  • 4
  • 14
  • I don't see an immediate problem in the code shown (the value 21 in the vector initializer should be sizeof(message_T), but that won't stop it from listening). Where is this code invoked? You could be running into a problem in the destructor (like the other linked question) – Dave S Oct 05 '21 at 21:43
  • As a note, I find it useful to add debugging to the thread when `io_context.run()` returns and when you're calling `io_context.stop()`, to confirm that it is exiting early, and that stop isn't being called explicitly. – Dave S Oct 05 '21 at 21:56
  • @DaveS I've added my main function and updated with some output. Nothing fancy. Thanks for the response. – JTInfinite Oct 06 '21 at 07:52
  • @DaveS when you say add debugging to the thread - how do you mean? – JTInfinite Oct 06 '21 at 07:54

2 Answers2

2

Update (Now I also read the question not just the code) While in WaitForMessages you do start listening by calling the m_socket.async_receive_from function, as it is async, that function will return/unblock as soon as it has setup the listening. So as long as you don't actually have a client sending you something, you server has nothing do to. Only when it has received something the callback will be called, by a thread calling io_context::run. So you need the work guard so that your thread running run won't unblock right after start, but will block as long as the work guard is there. Usually it is also combined with a try/while pattern if an exception gets thrown in a handler and you still want to move on with your server.

Also in the code you posted, you never actually call UDPServer::Start!


This was my first idea of an answer:

This is normal behavior of ASIO. The io_context::run function will return as soon as it has no work to do.

So to change the behaviour of the run function to block you have to use a boost::asio::executor_work_guard<boost::asio::io_context::executor_type> i.e. a so called work guard. Construct that object with a reference to your io_context and hold it i.e. don't let it destruct as long as you want to let the server run, i.e. do not want to let io_context::run return when there is not work.

So given

    boost::asio::io_context io_context_;
    boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;

you then could call

      work_guard_{boost::asio::make_work_guard(io_context_)},

const auto thread_count{std::max<unsigned>(std::thread::hardware_concurrency(), 1)};

std::generate_n(std::back_inserter(this->io_run_threads_),
                thread_count,
                [this]() {
                    return std::thread{io_run_loop,
                                       std::ref(this->io_context_), std::ref(this->error_handler_)};
                });

void io_run_loop(boost::asio::io_context &context,
                                    const std::function<void(std::exception &)> &error_handler) {
    while (true) {
        try {
            context.run();
            break;
        } catch (std::exception &e) {
            error_handler(e);
        }
    }

}

And then for server shutdown:

work_guard_.reset();
io_context_.stop();
std::for_each(this->io_run_threads_.begin(), this->io_run_threads_.end(), [](auto &thread) {
    if (thread.joinable()) thread.join();
});

For a more graceful shutdown you can omit the stop call and rather close all sockets before.

Superlokkus
  • 4,731
  • 1
  • 25
  • 57
  • Thanks so much for the explanation and good spot on the missed Start call! That was my omission and ultimately including a call to Start didn't change the issue I was having. Thank you for the posted code. I included a work guard but I am hesitant to introduce a "while(true)" loop into the code. – JTInfinite Oct 06 '21 at 11:36
  • Why is the while(true) necessary? Adding the work guard alone and omitting the loop seems to work fine...? – JTInfinite Oct 06 '21 at 11:37
  • 1
    I think your "update" is the crux of my misunderstanding - I thought giving the context a function, in this case "async_receive_from", was in essence giving the context repeating/infinite work to do. It seems that this is not the case. – JTInfinite Oct 06 '21 at 11:43
  • 1
    The as I said the while(true) is not really necessary in your case. I just wanted to mention this pattern here for completion since, if an exception gets thrown in your callbacks/server, `io_context::run` will unblock, regardless of the work guard, and you will terminate, often without being able to know what exception was thrown i.e. logging it. Hence the try around the run call. And if your server is so fault tolerant that it might can continue, you would want that loop. – Superlokkus Oct 06 '21 at 11:44
  • 1
    The whole idea of that "async" in academia called "reactor pattern" programming, is that while waiting to receive something indeed seems like infinite work, but in the context of operating systems it isn't: The network card and the OS knows best when you received something, so instead of the old synchronous way of saying "please receive and I'll wait here i.e. block" you say async "Please receive and call me back when you have something. Meanwhile I am free to do something else". And by I your program i.e. thread is meant. – Superlokkus Oct 06 '21 at 11:49
  • 1
    Brilliant - thanks again for the explanation!! I also understand the while loop now - like you say: "Usually it is also combined with a try/while pattern if an exception gets thrown in a handler and you still want to move on with your server." – JTInfinite Oct 06 '21 at 11:51
  • 2
    Hmm. My answer is late, but touches on a good number of issues not mentioned here :) +1 – sehe Oct 06 '21 at 12:06
2

Looks like you forgot to call server.Start();. Moreover, you will want to make the main thread wait for some amount of time, otherwise the destructor of Server will immediately cause Stop() to be called:

int main()
{
    Message<TestMsg> msg;
    msg.id = TestMsg::Join;
    msg << "hello";

    UDPServer<Message<TestMsg>> server(60000);
    server.Start();

    std::this_thread::sleep_for(30s);
}

Issues

  1. There is a conceptual problem with the Send API. It takes an endpoint on each call, but it only uses the one that starts the write call chain! This means that if you do

     srv.Send(msg1, {mymachine, 60001});
     srv.Send(msg1, {otherserver, 5517});
    

    It is likely they both get sent to mymachine:60001.

  2. How you treat the buffer received. Just using .data() blindly assumes that the data is NUL-terminated. Don't do that:

    std::string const data(vBuffer.data(), length);
    

    Also, you seem to have at some time been confused about data and printed m_endpoint.data() - your princess is in another castle.

    In reality you probably want ways to extract the typed data. I'm leaving that as beyond the scope of this question for today.

  3. Regardless you should clear the buffer before reuse, because you might be seeing old data in subsequent reads.

    vBuffer.assign(vBuffer.size(), '\0');
    
  4. This is most likely undefined behaviour:

    asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
    

    This is only valid if message_T is trivial and standard-layout ("POD" - Plain Old Data). The presence of operator<< strongly suggests that is not the case.

    Instead, build a (sequence of) buffer(s) hat represents the message as raw bytes, e.g.

    auto& msg = m_messagesOut.front();
    msg.length = msg.body.size();
    
    m_socket.async_send_to(
        std::vector<asio::const_buffer>{
            asio::buffer(&msg.id, sizeof(msg.id)),
            asio::buffer(&msg.length, sizeof(msg.length)),
            asio::buffer(msg.body),
        },
    
    // ...
    
  5. Thread safe queues seem to be overkill since you have a single service thread; that is an implicit "strand" so you can post to it to have single-threaded semantics.

Here's a few adaptations to make it work so far (except the exercise-for-the-reader pointed out):

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <deque>
#include <sstream>

// Library facilities
namespace asio = boost::asio;
using asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;

/////////////////////////////////
// mock ups:
template <typename message_T> struct Message {
    message_T   id;
    uint16_t    length; // automatically filled on send, UDP packets are < 64k
    std::string body;

    template <typename T> friend Message& operator<<(Message& m, T const& v)
    {
        std::ostringstream oss;
        oss << v;
        m.body += oss.str();
        //m.body += '\0'; // suggestion for easier message extraction

        return m;
    }
};

// Thread-safety can be replaced with the implicit strand of a single service
// thread
template <typename T> using TSQueue = std::deque<T>;
// end mock ups
/////////////////////////////////

template <typename message_T> class UDPServer {
  public:
    UDPServer(uint16_t port)
        : m_socket(m_asioContext, udp::endpoint(udp::v4(), port))
    {
        m_port = port;
    }

    virtual ~UDPServer() { Stop(); }

  public:
    // Starts the server!
    bool Start()
    {
        if (m_threadContext.joinable() && !m_asioContext.stopped())
            return false;

        try {
            // Issue a task to the asio context
            WaitForMessages();

            m_threadContext = std::thread([this]() { m_asioContext.run(); });
        } catch (std::exception const& e) {
            // Something prohibited the server from listening
            std::cerr << "[SERVER @ PORT " << m_port
                      << "] Exception: " << e.what() << "\n";
            return false;
        }
        std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
        return true;
    }

    // Stops the server!
    void Stop()
    {
        // Tell the context to stop processing
        m_asioContext.stop();

        // Tidy up the context thread
        if (m_threadContext.joinable())
            m_threadContext.join();

        // Inform someone, anybody, if they care...
        std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";

        m_asioContext
            .reset(); // required in case you want to reuse this Server object
    }

    void Send(message_T& msg, const udp::endpoint& ep)
    {
        asio::post(m_asioContext, [this, msg, ep]() {
            // If the queue has a message in it, then we must
            // assume that it is in the process of asynchronously being written.

            bool bWritingMessage = !m_messagesOut.empty();
            m_messagesOut.push_back(msg);
            if (!bWritingMessage) {
                WriteMessage(ep);
            }
        });
    }

  private:
    void WaitForMessages() // assumed to be on-strand
    {
        vBuffer.assign(vBuffer.size(), '\0');
        m_socket.async_receive_from(
            asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
            [this](std::error_code ec, std::size_t length) {
                if (!ec) {
                    std::string const data(vBuffer.data(), length);

                    std::cout << "[SERVER @ PORT " << m_port << "] Got "
                              << length << " bytes \n Data: " << data << "\n"
                              << "Address: " << m_endpoint.address()
                              << " Port: " << m_endpoint.port() << "\n"
                              << std::endl;
                } else {
                    std::cerr << "[SERVER @ PORT " << m_port
                              << "] Exception: " << ec.message() << "\n";
                    return;
                }

                WaitForMessages();
            });
    }

    void WriteMessage(const udp::endpoint& ep)
    {
        auto& msg = m_messagesOut.front();
        msg.length = msg.body.size();

        m_socket.async_send_to(
            std::vector<asio::const_buffer>{
                asio::buffer(&msg.id, sizeof(msg.id)),
                asio::buffer(&msg.length, sizeof(msg.length)),
                asio::buffer(msg.body),
            },
            ep, [this, ep](std::error_code ec, std::size_t length) {
                if (!ec) {
                    m_messagesOut.pop_front();

                    // If the queue is not empty, there are more messages to
                    // send, so make this happen by issuing the task to send the
                    // next header.
                    if (!m_messagesOut.empty()) {
                        WriteMessage(ep);
                    }

                } else {
                    std::cout << "[SERVER @ PORT " << m_port
                              << "] Write Header Fail.\n";
                    m_socket.close();
                }
            });
    }

  private:
    uint16_t          m_port = 0;
    udp::endpoint     m_endpoint;
    std::vector<char> vBuffer = std::vector<char>(21);

  protected:
    TSQueue<message_T> m_messagesIn;
    TSQueue<message_T> m_messagesOut;
    Message<message_T> m_tempMessageBuf;

    asio::io_context m_asioContext;
    std::thread      m_threadContext;
    udp::socket      m_socket;
};

enum class TestMsg {
    Ping,
    Join,
    Leave
};

int main()
{
    UDPServer<Message<TestMsg>> server(60'000);
    if (server.Start()) {
        std::this_thread::sleep_for(3s);

        {
            Message<TestMsg> msg;
            msg.id = TestMsg::Join;
            msg << "hello PI equals " << M_PI  << " in this world";

            server.Send(msg, {{}, 60'001});
        }

        std::this_thread::sleep_for(27s);
    }
}

For some reason netcat doesn't work with UDP on Coliru, so here's a "live" demo:

enter image description here

You can see our netcat client messages arriving. You can see the message Sent to 60001 arriving in the tcpdump output.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Hey get your own questions to answer ;-) To be fair, nicely done also picking up all the other implicit problems. See you on twitter :-) – Superlokkus Oct 06 '21 at 12:20
  • Thanks so much sehe! I have read many of your answer on C++. 1. If I am passing a newly constructed endpoint for each call to Send, why would it not update the write chain? 2. The idea for this system is that only a fixed length message size can be passed. The buffer will (eventually) ultimately always be size of message_T. Is this ok or am I missing something still? 3. Thank you! 4. message_T is POD and uses static_assert(std::is_standard_layout:: etc... to check that. 5. I will review the TSQueue - thanks for the advice. I'm now reviewing your code and will comment shortly – JTInfinite Oct 06 '21 at 12:23
  • No work guard in your example, @sehe? – JTInfinite Oct 06 '21 at 14:38
  • I think I smell the book/sample that your Message<> apparently comes from and it probably uses only is_standard_layout to static_assert (which is not enough). I can tell because the specific combination of a Message template *and* the weird operator<> interface is rare enough that it is almost certainly the same as code I reviewed a while ago. Can you tell me the source of that example code? – sehe Oct 06 '21 at 17:41
  • Also Re.: work guard, as you can see none is needed in my example - given the live demos.Work guards keep a service thread waiting even when there is no explicit work. In this case, there is always explicit work (or, indeed, it is desired that the application exits) – sehe Oct 06 '21 at 17:43
  • Oh I forget to respond to _”1. If I am passing a newly constructed endpoint for each call to Send, why would it not update the write chain?”_ - Well the queue does not contain de endpoint, it only contains the messages. The endpoint is only “bound” (passed as function argument) to `WriteMessage`. Ask yourself: who calls `WriteMessage`, and when exactly? What will `ep` contain? – sehe Oct 06 '21 at 18:00
  • @Superlokkus Oh I think I’m just slow. I was working on the answer _for a while_ before yours was posted. In fact, my initial reply was a comment, but it kept growing as I reviewed deeper, so I upgraded to an answer :) Cheers – sehe Oct 06 '21 at 18:02
  • I think I found the source of that sample. Is it https://github.com/OneLoneCoder/olcPixelGameEngine/blob/master/Videos/Networking/Parts1%262/net_message.h? – sehe Oct 07 '21 at 00:19
  • @sehe yes, thats the one! (Sorry for massively delayed response - didn't know you had replied until reviewing this question!) – JTInfinite Jan 24 '22 at 08:30