I'm working on a UDP client / server couple of applications, and my application which sends commands works excellently - I can monitor what's being sent to the port via nc and hexdump and they decode perfectly.
On my application which should be receiving the commands, I'm using recvfrom with the MSG_DONTWAIT flag. I'm doing this because I need to check a queue for things to be sent as well, so just leaving it blocking is not an option. IF I remove the MSG_DONTWAIT flag, messages are received and processed correctly, but it blocks waiting which won't work for my application. When using MSG_DONTWAIT, it always returns -1 and sets errno to EAGAIN. While this would be expected when nothing is being sent, it NEVER receives anything at all. I would think it would return EAGAIN until something is available, but that doesn't appear to be the case. Relevant code is posted below - what am I missing?
uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{
std::stringstream ss;
ss << "UDP session manager, setup ports.";
Logger::Info(ss.str());
tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (rx_socket_fd < 0)
{
Logger::Error("Could not open an rx UDP socket!");
}
else
{
std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
}
if (tx_socket_fd < 0)
{
Logger::Error("Could not open an tx UDP socket!");
}
else
{
std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
}
int reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));
tx_sockaddr.sin_family = AF_INET;
tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
tx_sockaddr.sin_port = htons(tx_port);
rx_sockaddr.sin_family = AF_INET;
rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
rx_sockaddr.sin_port = htons(rx_port);
int rva = 0;
rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
return NO_ERROR;
}
uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
const uint8_t * bytes = EncodeTelemetryToSend(telemetry);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva == -1 && errno == EINVAL)
{
ss.clear();
ss << "invalid argument!";
Logger::Warn(ss.str());
}
else if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}
uint8_t Receiver::SendCommand(const CommandBase * command)
{
const uint8_t * bytes = EncodeCommandToSend(command);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the message.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}
uint8_t Receiver::Receive()
{
uint8_t inputBuffer[UDP_BUFFER_BYTES];
memset(inputBuffer, '\0', UDP_BUFFER_BYTES);
int totalBytesRead = 0;
//socklen_t addressLength = sizeof(rx_sockaddr);
struct sockaddr_in sender;
socklen_t len;
totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
MSG_DONTWAIT, (struct sockaddr *) &sender, &len );
if ( totalBytesRead >= 0 )
{
std::stringstream ss;
ss << "UDP port read " << totalBytesRead << " bytes";
Logger::Info(ss.str() );
const CommandBase * command = DecodeReceivedCommand(inputBuffer);
if (command == NULL)
{
Logger::Warn("Failed to decode received command from commanding app.");
return UDP_ERROR_DECODE_FAILED;
}
EnqueCommand(command);
}
else
{
std::stringstream ss;
ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
Logger::Debug(ss.str());
}
return UDP_ERROR_NO_ERROR;
}
void Receiver::ProcessingLoopThread()
{
while ( GetState() == STATE_RUN )
{
const TelemetryBase * telemetry = DequeTelemetry();
while (telemetry != NULL)
{
std::stringstream ss;
ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
Logger::Debug(ss.str());
SendTelemetry(telemetry);
delete telemetry;
telemetry = DequeTelemetry();
}
Receive();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}