0

I am developing a QT 6 Widget based UDP audio application that repeatedly sends out a single UDP audio frame sample (4K bytes sine wave tone) to a remote UDP echo server at a predetermined rate - (right now the echo server is hosted locally though).

The UDP echo server is based on the asynchronous UDP echo server sample developed by the asio author (not me). This is shown below (slightly modified to include a hard coded 4K block for testing purposes). The application is also launched with a port parameter 1234 - so it listens on port 1234 for the incoming audio packet that it will echo back to client.

//
// async_udp_echo_server.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <cstdlib>
#include <iostream>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>

using asio::ip::udp;

class server {
public:
    server(asio::io_context& io_context, short port)
        : socket_(io_context, udp::endpoint(udp::v4(), port)) {
        do_receive();
    }

    void do_receive() {
        socket_.async_receive_from(
            asio::buffer(data_, max_length), sender_endpoint_,
            [this](std::error_code ec, std::size_t bytes_recvd) {
                if (!ec && bytes_recvd > 0) {
                    do_send(bytes_recvd);
                } else {
                    do_receive();
                }
            });
    }

    void do_send(std::size_t length) {
        socket_.async_send_to(
            asio::buffer(data_, length), sender_endpoint_,
            [this](std::error_code /*ec*/, std::size_t /*bytes_sent*/) {
                do_receive();
            });
    }

private:
    udp::socket socket_;
    udp::endpoint sender_endpoint_;
    enum { max_length = 4096 };
    char data_[max_length]{};
};

int main(int argc, char* argv[]) {
    try {
        if (argc != 2) {
            std::cerr << "Usage: async_udp_echo_server <port>\n";
            return 1;
        }

        asio::io_context io_context;

        server s(io_context, std::atoi(argv[1]));

        io_context.run();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

I currently have this working successfully in the client as a stand alone asio worker thread, however since I need to graphically display the returned audio packets, I cannot use the stand alone asio thread approach; I need to use QT with its signals/slots async magic instead.

For the purposes of illustration, I also include my working asio client code that runs in a separate joinable thread. This client thread uses a asio::steady_timer that fires an asynchronous 4k UDP packet repeatedly to the echo server. The code also compares the echoed back contents to this outgoing audio sample successfully.

void
RTPClient::start() {
    mpSendEndpoint = std::make_unique<ip::udp::endpoint>(
        ip::address::from_string(mConfig.mHostName),
        mConfig.mPortNum);
    mpSocket = std::make_unique<ip::udp::socket>(
        mIOContext, mpSendEndpoint->protocol());
    mpSocketTimer = std::make_unique<steady_timer>(
        mIOContext);
    mWorker = std::thread([this]() {
        mIOContext.run();
    });

    if (!mShutdownFlag) {
        // kick off the async chain by immediate timeout
        mpSocketTimer->expires_after(std::chrono::seconds(0));
        mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
            handle_timeout(std::forward<T0>(ec));
        });
    }
}

void
RTPClient::handle_timeout(const error_code& ec)
{
    if (!ec && !mShutdownFlag) {
        if (!mpAudioOutput) {
            // check to see if there is new audio test data waiting in queue
            if (const auto audioData = mIPCQueue->try_pop(); audioData) {
                // new audio waiting, copy the data to mpAudioTXData and allocate an identically
                // sized receive buffer to receive the echo replies from the server
                mpAudioInput = std::make_unique<AudioDatagram>(audioData->first.size());
                mpAudioOutput = std::make_unique<AudioDatagram>(std::move(audioData->first));
                mAudioBlockUSecs = audioData->second;
            } else {
                mpSocketTimer->expires_after(seconds(1));
                mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
                    handle_timeout(std::forward<T0>(ec));
                });
                // nothing to send as waveform data not received from GUI.
                // short circuit return with a 1 sec poll
                return;
            }
        }
        mpSocket->async_send_to(asio::buffer(
            mpAudioOutput.get(), mpAudioOutput->size()),
            *mpSendEndpoint, [this]<typename T0, typename T1>(T0&& ec, T1&& bytes_transferred) {
                handle_send_to(std::forward<T0>(ec), std::forward<T1>(bytes_transferred));
            });
    }
}

void
RTPClient::handle_send_to(const error_code& ec, std::size_t bytes_transferred) {
    if (!ec && bytes_transferred > 0 && !mShutdownFlag) {
        mpSocketTimer->expires_after(microseconds(mAudioBlockUSecs));
        mpSocketTimer->async_wait([this]<typename T0>(T0&& ec) {
            handle_timeout(std::forward<T0>(ec));
        });

        mpSocket->async_receive_from(asio::buffer(
            mpAudioInput.get(), mpAudioInput->size()), *mpSendEndpoint,
         [this]<typename T0, typename T1>(T0&& ec, T1&& bytes_transferred) {
             handle_receive(std::forward<T0>(ec), std::forward<T1>(bytes_transferred));
         });
    }
}

void
RTPClient::handle_receive(const error_code& ec, std::size_t bytes_transferred) {
    if (!ec && bytes_transferred > 0) {
        double foo = 0.0;
        for (const auto next : *mpAudioOutput) {
            foo += (double)next;
        }
        double bar = 0.0;
        for (const auto next : *mpAudioInput) {
            bar += (double)next;
        }
        if (foo != bar)
        {
            auto baz = 0;
            (void)baz;
        }
    }
}

/**
 * Shutdown the protocol instance by shutting down the IPC
 * queue and closing the socket and associated timers etc.
 *
 * <p>This is achieved by setting a flag which is read by the
 * busy loop as an exit condition.
 */
void
RTPClient::shutdown() {
    // set the shared shutdown flag
    mShutdownFlag = true;

    // wake up any locked threads so they can see the above flag
    if (mIPCQueue) {
        mIPCQueue->shutdown();
    }

    // stop the socket timer - do not reset it
    // as there are some time sensitive parts in the code
    // where mpSocketTimer is dereferenced
    if (mpSocketTimer) {
        mpSocketTimer->cancel();
    }

    std::error_code ignoredError;
    // close the socket if we created & opened it, making
    // sure that we close down both ends of the socket.
    if (mpSocket && mpSocket->is_open()) {
        mpSocket->shutdown(ip::udp::socket::shutdown_both, ignoredError);
        // reset so we will reallocate and then reopen
        // via boost::async_connect(...) later.
        mpSocket.reset();
    }

    // wait for the any other detached threads to see mShutdownFlag
    // as it is running in a detached mWorkerThread which sleeps
    // for 50ms CDU key polling requests.
    std::this_thread::sleep_for(milliseconds(200));
}

I need to replace this separate asio client thread code with a QUdpSocket based client code to do the equivalent, as I need to use signals/slots to notify the GUI when the blocks arrive and display the returned waveform in a widget. To this end I have the following QT worker thread. I can see that the asio echo server receives the datagram, however I do not know how to receive the echoed contents back into the client. Is there some bind or connect call that I need to do on the client side. I am totally confused with when to call bind and when to call connect on UDP sockets.

// SYSTEM INCLUDES
//#include <..>

// APPLICATION INCLUDES
#include "RTPSession.h"

// DEFINES
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
// FUNCTIONS
// NAMESPACE USAGE
using namespace std::chrono;

// STATIC VARIABLE INITIALIZATIONS
std::mutex RTPSession::gMutexGuard;


RTPSession::RTPSession(QObject* parent)
    : QObject(parent)
    , mpSocket{ std::make_unique<QUdpSocket>(parent) }
{
    mpSocket->bind(45454, QUdpSocket::DefaultForPlatform);
    connect(mpSocket.get(), &QUdpSocket::readyRead,
        this, &RTPSession::processPendingDatagrams);
}

/**
 * Thread function that listens RTP session updates.
 *
 * <p>The implementation polls for shutdown every second.
 *
 * @param rRTPInfo [in] qt thread parameters.
 */
void
RTPSession::doWork(
    const std::tuple<int32_t, int32_t, int32_t>& /*rRTPInfo*/)
{
    try {
        // just dispatched, so reset exit flag
        mExitWorkLoop = false;
        int frameCounter = 0;
        while (!mExitWorkLoop) {
            constexpr auto gPollMillis = 1000;
            // poll using shortest (non zero) interval in schedule
            std::unique_lock<std::mutex> lk(gMutexGuard);
            mCondVariable.wait_for(lk, milliseconds(gPollMillis),
                [this] { return mExitWorkLoop; });
            QByteArray datagram = "Broadcast message " + QByteArray::number(frameCounter++);
            mpSocket->writeDatagram(datagram.data(), datagram.size(),
                QHostAddress::LocalHost, 1234);

            if (mpSocket->hasPendingDatagrams()) {

                //mpSocket->readDatagram()
                int t = 0;
                (void)t;

            }

            // update GUI with the audio stats - add more later
            emit updateProgress(frameCounter++);
        }
    } catch (const std::exception& rEx) {
        // exit thread with the exception details
        emit finishWork(tr("exiting worker, error:") + rEx.what());
    }
    // exit thread with status bar message
    emit finishWork(tr("finished"));
}

void
RTPSession::shutdown()
{
    // Critical section.
    std::scoped_lock<std::mutex> lock(gMutexGuard);
    mExitWorkLoop = true;
    // Notify the potentially sleeping thread that is
    // waiting for up to 1 second
    mCondVariable.notify_one();
}

void
RTPSession::processPendingDatagrams() {
    QByteArray datagram;
    while (mpSocket->hasPendingDatagrams()) {
        datagram.resize(int(mpSocket->pendingDatagramSize()));
        mpSocket->readDatagram(datagram.data(), datagram.size());
        //statusLabel->setText(tr("Received datagram: \"%1\"")
        //    .arg(datagram.constData()));
    }
}
johnco3
  • 2,401
  • 4
  • 35
  • 67
  • Maybe look at https://stackoverflow.com/questions/42714700/how-to-receive-proper-udp-packet-in-qt – Den-Jason Jul 14 '22 at 21:35
  • 1
    One other issue - try and minimise how much data / how many transactions you put through the signal/slot mechanism. Queue the bulk of the data through a separate mutex-protected deque – Den-Jason Jul 14 '22 at 21:36
  • This answer explains why in some cases you need to use bind, and why not in others. Basically, if you use sendto before recvfrom, you don't need to bind - https://stackoverflow.com/a/14243544/1607937 – Den-Jason Jul 14 '22 at 21:43
  • At the base level you simply use sendto/recvfrom. See linux.die.net/man/2/recvfrom Do pay attention to the "UDP receive buffer size" which may default to being quite small. I recommend the book "linux socket programming by example" – Den-Jason Jul 14 '22 at 21:44
  • @Den-Jason thanks for the send-to explanation. Right now I am in the middle of moving my client code from asio to qt - the sender side uses asio mpSocket->async_send_to(...) which I presumes uses the send_to api to transmit the first packet to the UDP echo server (thus bind for replies should not be reqd). This echo server gets ip/port of this client in socket_.async_receive_from(... sender_endpoint_) which is how my pure asio client worked I presume. Now that I am replacing UDP listening code with a QUdpSocket - I am not sure how to tell it to QT to listen to the sender_endpoint_'s port. – johnco3 Jul 14 '22 at 22:32

0 Answers0