1

In my .Net Core app, there's a method in a 3rd party library that writes to a System.IO.Stream interface (it takes the stream interface as an argument and writes to it) but I want that data to go to my data source that expects data as an IAsyncEnumerable<bytes> stream. I set about writing the code implement the Stream interface so when Write() is called it writes to IAsyncEnumerable<bytes>, then thought 'this must have been done before' - seems like it would be of general purpose use.

So is there a standard implementation of this in a 3rd party library, or any 'neat trick' I'm missing?

Richard Hunt
  • 303
  • 2
  • 10
  • There is a [`Stream.ReadAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.io.stream.readasync) method that could probably be used to solve this problem, but not all overloads are supported on older .NET platforms. What is the .NET platform you are targeting? Also is it mandatory that a `byte` written to the stream should be immediately surfaced from the `IAsyncEnumerable`, or you are OK with some latency (buffering) to improve performance? – Theodor Zoulias Jul 05 '21 at 12:09
  • Thanks, but `Stream.ReadAsync` would be used for reading the stream, whereas my issue is that something's writing to the stream and I want to put in an implementation of the stream it's writing to which then writes the bytes to my app that's expecting `IAsyncEnumerable`. Developing in modern .Net Core (which I missed from original description!) and don't mind latency. I can write a `Stream` implementation that supports `Write`, I'm just asking if it's already been done as I'd thought "conversion between different types of stream" would be a common requrement. – Richard Hunt Jul 05 '21 at 14:17
  • What is the origin of this `Stream`? Does the 3rd party library exposes a `Stream` directly, or it allows you to pass your own `Stream` implementation as an argument to a method? In the second case you could take a look at [this](https://stackoverflow.com/questions/3721552/implementing-async-stream-for-producer-consumer "Implementing async stream for producer/consumer") question, as a starting step of solving this problem. – Theodor Zoulias Jul 05 '21 at 17:22
  • Thanks for asking, Theodor - it's the second case (I've clarified that in the question). The linked question is interesting, it has some good code fragments. I was asking specifically if there was a standard implementation of this already, but I guess not, so I'll have to write my own. Thanks – Richard Hunt Jul 06 '21 at 08:23

1 Answers1

1

Here is a custom Stream implementation, intended for asynchronous producer-consumer scenarios. It's a writable-only stream, and reading (consuming) it is only possible through the special GetConsumingEnumerable method.

public class ProducerConsumerStream : Stream
{
    private readonly Channel<byte> _channel;

    public ProducerConsumerStream(bool singleReader = true, bool singleWriter = true)
    {
        _channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
        {
            SingleReader = singleReader,
            SingleWriter = singleWriter
        });
    }

    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() { }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset, SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override int Read(byte[] buffer, int offset, int count)
        => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException(nameof(buffer));
        if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        if (offset + count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        for (int i = offset; i < offset + count; i++)
            _channel.Writer.TryWrite(buffer[i]);
    }

    public override void WriteByte(byte value)
    {
        _channel.Writer.TryWrite(value);
    }

    public override void Close()
    {
        base.Close();
        _channel.Writer.Complete();
    }

    public IAsyncEnumerable<byte> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }
}

This implementation is based on a Channel<byte>. If you are unfamiliar with the channels, there is a tutorial here.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104