I am developing a QT 6 Widget based UDP audio application that repeatedly sends out a single UDP audio frame sample (4K bytes sine wave tone) to a remote UDP echo server at a predetermined rate - (right now the echo server is hosted locally though).
The UDP echo server is based on the asynchronous UDP echo server sample developed by the asio author (not me). This is shown below (slightly modified to include a hard coded 4K block for testing purposes). The application is also launched with a port parameter 1234 - so it listens on port 1234 for the incoming audio packet that it will echo back to client.
//
// async_udp_echo_server.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <iostream>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
using asio::ip::udp;
class server {
public:
server(asio::io_context& io_context, short port)
: socket_(io_context, udp::endpoint(udp::v4(), port)) {
do_receive();
}
void do_receive() {
socket_.async_receive_from(
asio::buffer(data_, max_length), sender_endpoint_,
[this](std::error_code ec, std::size_t bytes_recvd) {
if (!ec && bytes_recvd > 0) {
do_send(bytes_recvd);
} else {
do_receive();
}
});
}
void do_send(std::size_t length) {
socket_.async_send_to(
asio::buffer(data_, length), sender_endpoint_,
[this](std::error_code /*ec*/, std::size_t /*bytes_sent*/) {
do_receive();
});
}
private:
udp::socket socket_;
udp::endpoint sender_endpoint_;
enum { max_length = 4096 };
char data_[max_length]{};
};
int main(int argc, char* argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: async_udp_echo_server <port>\n";
return 1;
}
asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
I currently have this working successfully in the client as a stand alone asio worker thread, however since I need to graphically display the returned audio packets, I cannot use the stand alone asio thread approach; I need to use QT with its signals/slots async magic instead.
For the purposes of illustration, I also include my working asio client code that runs in a separate joinable thread. This client thread uses a asio::steady_timer that fires an asynchronous 4k UDP packet repeatedly to the echo server. The code also compares the echoed back contents to this outgoing audio sample successfully.
void
RTPClient::start() {
mpSendEndpoint = std::make_unique<ip::udp::endpoint>(
ip::address::from_string(mConfig.mHostName),
mConfig.mPortNum);
mpSocket = std::make_unique<ip::udp::socket>(
mIOContext, mpSendEndpoint->protocol());
mpSocketTimer = std::make_unique<steady_timer>(
mIOContext);
mWorker = std::thread([this]() {
mIOContext.run();
});
if (!mShutdownFlag) {
// kick off the async chain by immediate timeout
mpSocketTimer->expires_after(std::chrono::seconds(0));
mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
handle_timeout(std::forward<T0>(ec));
});
}
}
void
RTPClient::handle_timeout(const error_code& ec)
{
if (!ec && !mShutdownFlag) {
if (!mpAudioOutput) {
// check to see if there is new audio test data waiting in queue
if (const auto audioData = mIPCQueue->try_pop(); audioData) {
// new audio waiting, copy the data to mpAudioTXData and allocate an identically
// sized receive buffer to receive the echo replies from the server
mpAudioInput = std::make_unique<AudioDatagram>(audioData->first.size());
mpAudioOutput = std::make_unique<AudioDatagram>(std::move(audioData->first));
mAudioBlockUSecs = audioData->second;
} else {
mpSocketTimer->expires_after(seconds(1));
mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
handle_timeout(std::forward<T0>(ec));
});
// nothing to send as waveform data not received from GUI.
// short circuit return with a 1 sec poll
return;
}
}
mpSocket->async_send_to(asio::buffer(
mpAudioOutput.get(), mpAudioOutput->size()),
*mpSendEndpoint, [this]<typename T0, typename T1>(T0&& ec, T1&& bytes_transferred) {
handle_send_to(std::forward<T0>(ec), std::forward<T1>(bytes_transferred));
});
}
}
void
RTPClient::handle_send_to(const error_code& ec, std::size_t bytes_transferred) {
if (!ec && bytes_transferred > 0 && !mShutdownFlag) {
mpSocketTimer->expires_after(microseconds(mAudioBlockUSecs));
mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
handle_timeout(std::forward<T0>(ec));
});
mpSocket->async_receive_from(asio::buffer(
mpAudioInput.get(), mpAudioInput->size()), *mpSendEndpoint,
[this]<typename T0, typename T1>(T0&& ec, T1&& bytes_transferred) {
handle_receive(std::forward<T0>(ec), std::forward<T1>(bytes_transferred));
});
}
}
void
RTPClient::handle_receive(const error_code& ec, std::size_t bytes_transferred) {
if (!ec && bytes_transferred > 0) {
double foo = 0.0;
for (const auto next : *mpAudioOutput) {
foo += (double)next;
}
double bar = 0.0;
for (const auto next : *mpAudioInput) {
bar += (double)next;
}
if (foo != bar)
{
auto baz = 0;
(void)baz;
}
}
}
/**
* Shutdown the protocol instance by shutting down the IPC
* queue and closing the socket and associated timers etc.
*
* <p>This is achieved by setting a flag which is read by the
* busy loop as an exit condition.
*/
void
RTPClient::shutdown() {
// set the shared shutdown flag
mShutdownFlag = true;
// wake up any locked threads so they can see the above flag
if (mIPCQueue) {
mIPCQueue->shutdown();
}
// stop the socket timer - do not reset it
// as there are some time sensitive parts in the code
// where mpSocketTimer is dereferenced
if (mpSocketTimer) {
mpSocketTimer->cancel();
}
std::error_code ignoredError;
// close the socket if we created & opened it, making
// sure that we close down both ends of the socket.
if (mpSocket && mpSocket->is_open()) {
mpSocket->shutdown(ip::udp::socket::shutdown_both, ignoredError);
// reset so we will reallocate and then reopen
// via boost::async_connect(...) later.
mpSocket.reset();
}
// wait for the any other detached threads to see mShutdownFlag
// as it is running in a detached mWorkerThread which sleeps
// for 50ms CDU key polling requests.
std::this_thread::sleep_for(milliseconds(200));
}
I need to replace this separate asio client thread code with a QUdpSocket based client code to do the equivalent, as I need to use signals/slots to notify the GUI when the blocks arrive and display the returned waveform in a widget. To this end I have the following QT worker thread. I can see that the asio echo server receives the datagram, however I do not know how to receive the echoed contents back into the client. Is there some bind or connect call that I need to do on the client side. I am totally confused with when to call bind and when to call connect on UDP sockets.
// SYSTEM INCLUDES
//#include <..>
// APPLICATION INCLUDES
#include "RTPSession.h"
// DEFINES
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
// FUNCTIONS
// NAMESPACE USAGE
using namespace std::chrono;
// STATIC VARIABLE INITIALIZATIONS
std::mutex RTPSession::gMutexGuard;
RTPSession::RTPSession(QObject* parent)
: QObject(parent)
, mpSocket{ std::make_unique<QUdpSocket>(parent) }
{
mpSocket->bind(45454, QUdpSocket::DefaultForPlatform);
connect(mpSocket.get(), &QUdpSocket::readyRead,
this, &RTPSession::processPendingDatagrams);
}
/**
* Thread function that listens RTP session updates.
*
* <p>The implementation polls for shutdown every second.
*
* @param rRTPInfo [in] qt thread parameters.
*/
void
RTPSession::doWork(
const std::tuple<int32_t, int32_t, int32_t>& /*rRTPInfo*/)
{
try {
// just dispatched, so reset exit flag
mExitWorkLoop = false;
int frameCounter = 0;
while (!mExitWorkLoop) {
constexpr auto gPollMillis = 1000;
// poll using shortest (non zero) interval in schedule
std::unique_lock<std::mutex> lk(gMutexGuard);
mCondVariable.wait_for(lk, milliseconds(gPollMillis),
[this] { return mExitWorkLoop; });
QByteArray datagram = "Broadcast message " + QByteArray::number(frameCounter++);
mpSocket->writeDatagram(datagram.data(), datagram.size(),
QHostAddress::LocalHost, 1234);
if (mpSocket->hasPendingDatagrams()) {
//mpSocket->readDatagram()
int t = 0;
(void)t;
}
// update GUI with the audio stats - add more later
emit updateProgress(frameCounter++);
}
} catch (const std::exception& rEx) {
// exit thread with the exception details
emit finishWork(tr("exiting worker, error:") + rEx.what());
}
// exit thread with status bar message
emit finishWork(tr("finished"));
}
void
RTPSession::shutdown()
{
// Critical section.
std::scoped_lock<std::mutex> lock(gMutexGuard);
mExitWorkLoop = true;
// Notify the potentially sleeping thread that is
// waiting for up to 1 second
mCondVariable.notify_one();
}
void
RTPSession::processPendingDatagrams() {
QByteArray datagram;
while (mpSocket->hasPendingDatagrams()) {
datagram.resize(int(mpSocket->pendingDatagramSize()));
mpSocket->readDatagram(datagram.data(), datagram.size());
//statusLabel->setText(tr("Received datagram: \"%1\"")
// .arg(datagram.constData()));
}
}