0

I'm working on a UDP client / server couple of applications, and my application which sends commands works excellently - I can monitor what's being sent to the port via nc and hexdump and they decode perfectly.

On my application which should be receiving the commands, I'm using recvfrom with the MSG_DONTWAIT flag. I'm doing this because I need to check a queue for things to be sent as well, so just leaving it blocking is not an option. IF I remove the MSG_DONTWAIT flag, messages are received and processed correctly, but it blocks waiting which won't work for my application. When using MSG_DONTWAIT, it always returns -1 and sets errno to EAGAIN. While this would be expected when nothing is being sent, it NEVER receives anything at all. I would think it would return EAGAIN until something is available, but that doesn't appear to be the case. Relevant code is posted below - what am I missing?

uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{

    std::stringstream ss;
    ss << "UDP session manager, setup ports.";
    Logger::Info(ss.str());

    tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
    rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);

    if (rx_socket_fd < 0)
    {
        Logger::Error("Could not open an rx UDP socket!");
    }
    else
    {
        std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
    }
    if (tx_socket_fd < 0)
    {
        Logger::Error("Could not open an tx UDP socket!");
    }
    else
    {
        std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
    }


    int reuse = 1;
    if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    reuse = 1;
    if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
    memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));

    tx_sockaddr.sin_family = AF_INET;
    tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    tx_sockaddr.sin_port = htons(tx_port);

    rx_sockaddr.sin_family = AF_INET;
    rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    rx_sockaddr.sin_port = htons(rx_port);

    int rva = 0;

    rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    return NO_ERROR;
}


uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
    const uint8_t * bytes = EncodeTelemetryToSend(telemetry);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva == -1  && errno == EINVAL)
    {
        ss.clear();
        ss << "invalid argument!";
        Logger::Warn(ss.str());
    }
    else if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}



uint8_t Receiver::SendCommand(const CommandBase * command)
{
    const uint8_t * bytes = EncodeCommandToSend(command);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the message.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}

uint8_t Receiver::Receive()
{
    uint8_t inputBuffer[UDP_BUFFER_BYTES];
    memset(inputBuffer, '\0', UDP_BUFFER_BYTES);

    int totalBytesRead = 0;

    //socklen_t addressLength = sizeof(rx_sockaddr);
    struct sockaddr_in sender;
    socklen_t len;

    totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                          MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

    if ( totalBytesRead >= 0 )
    {
        std::stringstream ss;
        ss << "UDP port read " << totalBytesRead << " bytes";
        Logger::Info(ss.str() );

        const CommandBase * command = DecodeReceivedCommand(inputBuffer);

        if (command == NULL)
        {
            Logger::Warn("Failed to decode received command from commanding app.");
            return UDP_ERROR_DECODE_FAILED;
        }

        EnqueCommand(command);

    }
    else
    {
        std::stringstream ss;
        ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
        Logger::Debug(ss.str());
    }

    return UDP_ERROR_NO_ERROR;
}



void Receiver::ProcessingLoopThread()
{
    while ( GetState() == STATE_RUN )
    {
        const TelemetryBase * telemetry = DequeTelemetry();

        while (telemetry != NULL)
        {
            std::stringstream ss;
            ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
            Logger::Debug(ss.str());

            SendTelemetry(telemetry);
            delete telemetry;
            telemetry = DequeTelemetry();
        }

        Receive();

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
Shawn
  • 47,241
  • 3
  • 26
  • 60
trycatch
  • 568
  • 1
  • 8
  • 17
  • This doesn't answer your question, but as an aside you might be interested in learning about the self-pipe trick, as it allows you to avoid indefinite blocking inside `recv()` and and simultaneously also avoid wasteful polling of the socket: https://stackoverflow.com/questions/384391/how-to-signal-select-to-return-immediately – Jeremy Friesner Dec 29 '18 at 00:49
  • It's quite unusual to use two different sockets for sending and receiving. Is there some particular reason you're doing things that way? Also, can you give us the actual involved ports and addresses? It's hard to tell which match and which don't. (For example does `tx_sockaddr`'s value change between the bind and the sendto? If not, why is the socket sending to itself?) – David Schwartz Dec 29 '18 at 03:30

3 Answers3

2

A few things:

This may not be an issue, but I would encourage you capture the errno before any other code has a chance to run and clear errno. Instead of this:

    std::stringstream ss;
    ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;

Better:

totalBytesRead = recvfrom(rx_socket_fd,...
int lasterror = errno; // catch errno before anything else can change it


. . .
ss << "UDP port rva = " << totalBytesRead << ", errno is " << lasterror;

Back to your original issue.

I'm guessing that you need to poll the socket more than once when using the non-blocking MSG_DONTWAIT flag.

It looks like your main loop is sleeping for 10 milliseconds between each poll of the socket. If that's your design, then just do this:

When you create the socket, set a 10 millisecond timeout on it:

timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10 * 1000; // 10 milliseconds
setsockopt(rx_socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

Then simply remove the MSG_DONTWAIT flag from the recvfrom call.

Also, remove the sleep statement in your main loop:

 std::this_thread::sleep_for(std::chrono::milliseconds(10));

And then gracefully handle the timeout error as a benign thing that can happen

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      0, (struct sockaddr *)  &sender, &len );


if (totalBytesRead >= 0 )
{
    // data available - handle it
}
else
{
    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
    {
        // socket timed out after waiting 10 milliseconds
    }
    else
    {
        // actual socket error
    }
}
selbie
  • 100,020
  • 15
  • 103
  • 173
  • Not sure if it is going to help anyone, but I was using socklen_t for return of recvfrom, but when you use MSG_DONTWAIT, you can get a negative number, which becomes a really high uint, so maybe the right way to do it use ssize_t (from https://man7.org/linux/man-pages/man2/recvfrom.2.html), because for me the comparison was failing – user27221 Apr 13 '22 at 13:13
1
struct sockaddr_in sender;
socklen_t len;

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

You did not assign len a sensible value. If you don't initialize len to the size of the socket address, the call can fail.

Also, you capture errno too late. You must capture it immediately after the call that gets the error. Otherwise, other intermediary operations can change its value. So you can't rely on getting a sensible value.

Your use of separate sockets for sending and receiving is very strange. If you are sending and receiving to the same other endpoint, you should use only a single socket.

David Schwartz
  • 179,497
  • 17
  • 214
  • 278
-1

MSG_DONTWAIT (since Linux 2.2) Enables nonblocking operation; if the operation would block, the call fails with the error EAGAIN or EWOULDBLOCK (this can also be enabled using the O_NONBLOCK flag with the F_SETFL fcntl(2)).

litsun
  • 1
  • 1
  • I don't think this is constructive answer, you could improve the answer by adding code example to support your answer. – STEEL Jul 21 '21 at 14:15