0

Description

I would like to read asynchronously from a NetworkStream or SSLStream using their abstract Stream parent class. There are different ways to read asynchronously from stream:

  • Asynchronous Programming Model (APM): It uses BeginRead and EndRead operations.
  • Task Parallel Library (TPL): It uses Task and creates task continuations.
  • Task-based Asynchronous Pattern (TAP): Operations are suffixed Async and async and await keyword can be used.

I am mostly interested using the TAP pattern to achieve asynchronous read operation. Code below, asynchronously read to the end of stream and returns with the data as byte array:

    internal async Task<byte[]> ReadToEndAsync(Stream stream)
    {
        byte[] buffer = new byte[4096];
        using (MemoryStream memoryStream = new MemoryStream())
        {
            int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            while (bytesRead != 0)
            {
                // Received datas were aggregated to a memory stream.
                await memoryStream.WriteAsync(buffer, 0, bytesRead);
                bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            }

            return memoryStream.ToArray();
        }
    }

The buffer size is 4096kB. If the transferred data is more than the buffer size then it continues to read until 0 (zero), the end of the stream. It works with FileStream but it hangs indefinitely at the ReadAsync operation using NetworkStream or SslStream. These two stream behave differently than other streams.

The problem lies behind that the network stream ReadAsync will only return with 0 (zero) when the Socket communication is closed. However I do not want to close the communication every time a data is transferred through the network.

Question

How can I avoid the blocking call of the ReadAsync without closing the Socket communication?

Péter Szilvási
  • 362
  • 4
  • 17
  • If the socket is open but no data is available, you'd expect the `ReadAsync` of a stream representing a connection to that socket to hang indefinitely until either, data arrives, the socket is closed or cancellation is triggered. Using `... await ReadAsync(...)` Blocks only the waiting task, allowing other tasks to continue processing. – Jodrell Aug 15 '22 at 10:17
  • That is right. But I need the result of the current received data. Therefore I have also await ReadToEndAsync method which results with blocking. – Péter Szilvási Aug 15 '22 at 10:41
  • 1
    Consider creating a mesh with TPL Dataflow, https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library. However, you can't both wait till the end and start processing before the end. You will need concurrent tasks with intermediate buffering. – Jodrell Aug 15 '22 at 10:45
  • @Jodrell Thank you for pointing out the DataFlow programming model. It turns out, it can be used with the async and await operators https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-a-producer-consumer-dataflow-pattern – Péter Szilvási Aug 15 '22 at 12:13
  • 1
    The TPL Dataflow types are specifically designed for working with TAP async/await. They can save you from writing some risky pipework. My point wasn't about the applicability of Dataflow but rather a need to adjust your perspective a little. – Jodrell Aug 15 '22 at 12:27

2 Answers2

3

I would like to read asynchronously from a NetworkStream or SSLStream using their abstract Stream parent class.

Do you really need to? TCP/IP socket communication is quite complex to do correctly. I strongly recommend self-hosting an HTTP API or something like that instead.

The problem lies behind that the network stream ReadAsync will only return with 0 (zero) when the Socket communication is closed.

Yes, and the code you posted only returns a result when the entire stream has been read.

So, what you need is a different return condition. Specifically, you need to know when a single message has been read from the stream. What you need is message framing, as described on my blog.

I also have some sample code on my blog showing one of the simplest message framing solutions (length prefix). Take careful note of the complexity and length of the simplest solution, and consider whether you really want to write socket-level code.

If you do want to continue writing socket applications, I recommend watching my YouTube series on asynchronous TCP/IP socket programming.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Yes, we implemented a TCP/IP socket from the ground up, hence we can not switch to HTTP easily. Thank you for the youtube video link, I wish I had come across these video sooner. I will definitely check it out. – Péter Szilvási Aug 16 '22 at 07:37
0

As Stephen Cleary blog post pointed out there are two approaches used for message framing: length prefixing and delimiters.

  • Length prefixing: Message length was known, because it was sent through the network.
  • Delimiters: Message length was unknown. Escaping was determined by the delimiter characters.

Codes below use length prefixing in order to prepend the message length to the message.

internal async Task SendAsync(Stream stream, byte[] message)
{
    byte[] messageSize = BitConverter.GetBytes(message.Length);
    await stream.WriteAsync(messageSize, 0, messageSize.Length).ConfigureAwait(false);
    await stream.WriteAsync(message, 0, message.Length).ConfigureAwait(false);
}

The sender side first converts the message length to a byte array and then sends it to the receiver. The messageSize.Length is 4 because a signed 32-bit integer was converted. After that, the client sends the actual message. The receiver side is a little bit more complicated:

internal async Task<byte[]> ReceiveAsync(Stream stream)
{
    byte[] messageBuffer = null;
    byte[] lengthBuffer = new byte[4];

    try
    {
        await stream.ReadAsync(lengthBuffer, 0, lengthBuffer.Length).ConfigureAwait(false);
        int messageLength = BitConverter.ToInt32(lengthBuffer, 0);

        messageBuffer = new byte[messageLength];
        await ReadToEndAsync(stream, messageBuffer, messageLength).ConfigureAwait(false);
    }
    catch (Exception)
    {
        // Error occured during receiving.
        throw;
    }

    return dataBuffer;
}

First the receiver reads the message length to a byte array with length 4. Then it converts the received byte array to a signed Int32 value. Once the message length is known then the message can be read until the end.

private async Task ReadToEndAsync(Stream stream, byte[] messageBuffer, int messageLength)
{
    int offset = 0;
    while (offset < messageLength)
    {
        int bytesRead = await stream.ReadAsync(messageBuffer, offset, messageLength).ConfigureAwait(false);
        if (bytesRead == 0)
        {
            // Socket is closed.
            break;
        }

        offset += bytesRead;
    }
}

It reads the message from the stream until the messageLength is reached. If single read operation is not enough then it advances the buffer's offset with the amount of bytes read from the stream.

Note considering to prevent DOS attack:

Whether using length-prefixing or delimiters, one must include code to prevent denial of service attacks. Length-prefixed readers can be given a huge message size; delimiting readers can be given a huge amount of data without delimiters.

Péter Szilvási
  • 362
  • 4
  • 17