Here is a custom Stream
implementation, intended for asynchronous producer-consumer scenarios. It's a writable-only stream, and reading (consuming) it is only possible through the special GetConsumingEnumerable
method.
public class ProducerConsumerStream : Stream
{
private readonly Channel<byte> _channel;
public ProducerConsumerStream(bool singleReader = true, bool singleWriter = true)
{
_channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
{
SingleReader = singleReader,
SingleWriter = singleWriter
});
}
public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Length { get { throw new NotSupportedException(); } }
public override void Flush() { }
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException();
public override void SetLength(long value)
=> throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
for (int i = offset; i < offset + count; i++)
_channel.Writer.TryWrite(buffer[i]);
}
public override void WriteByte(byte value)
{
_channel.Writer.TryWrite(value);
}
public override void Close()
{
base.Close();
_channel.Writer.Complete();
}
public IAsyncEnumerable<byte> GetConsumingEnumerable(
CancellationToken cancellationToken = default)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}
}
This implementation is based on a Channel<byte>
. If you are unfamiliar with the channels, there is a tutorial here.