Using internally a BlockingCollection<>
as a queue, you could write something like:
public class WatitingStream : Stream
{
private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();
private byte[] IncompletePacket;
private int IncompletePacketOffset;
public WatitingStream()
{
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
Packets.CompleteAdding();
}
base.Dispose(disposing);
}
public override bool CanRead
{
get { return Packets.IsCompleted; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return Packets.IsAddingCompleted; }
}
public override void Flush()
{
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return 0;
}
byte[] packet;
int packetOffset;
if (IncompletePacket != null)
{
packet = IncompletePacket;
packetOffset = IncompletePacketOffset;
}
else
{
if (Packets.IsCompleted)
{
return 0;
}
packet = Packets.Take();
packetOffset = 0;
}
int read = Math.Min(packet.Length - packetOffset, count);
Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);
packetOffset += read;
if (packetOffset < packet.Length)
{
IncompletePacket = packet;
IncompletePacketOffset = packetOffset;
}
else
{
IncompletePacket = null;
IncompletePacketOffset = 0;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return;
}
byte[] packet = new byte[count];
Buffer.BlockCopy(buffer, offset, packet, 0, count);
Packets.Add(packet);
}
}
You use it as a normal stream. The Write
doesn't block. The Read
blocks.
Some decisions had to be taken: this Stream
is "packet" based. It won't Write
zero-length packets, and a Read
will return the data of one packet. The Read
won't continue on the next packet. If there is data remaining in a packet after a Read
, that data is saved for the next Read
. The Dispose()
will stop the Write
(so if the "client" does a Dispose()
before the "server", the server will get an exception if it tries to do a Write
). If the "server" does the Dispose()
first, the "client" can finish reading the packets still present. Clearly it is possible (easy) to split this class in two classes (one Server
and one Client
), where the Server
keeps the BlockingCollection<>
and the Client has a reference to the "server". This would solve the "Dispose()
" anomaly/problem (but would double the code size :-) )