You don't need an alternative type but an alternative way of using Channels. Instead of cancelling the reader, close the writer. This is the technique used in the .NET RabbitMQ client.
Besides, it's far more important to ensure that the payload doesn't require lots of allocations. If you use ArrayPool to reduce allocations you can't just abort the worker method, you need to retrieve the messages and release them.
Channels in RabbitMQ
RabbitMQ uses Channels in the hottest path - when writing message frames to the network stream and when receiving bytes. Few libraries handle more traffic than RabbitMQ. Kestrel itself is one of them, and the use case is similar. Both are expected to handle millions of requests/messages without overhead.
Writing is handled by the WriteLoop
method which runs in a background task:
_writerTask = Task.Run(WriteLoop);
The WriteLoop method itself doesn't cancel. If it did, it would risk leaving a message unfinished.
private async Task WriteLoop()
{
try
{
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
#if NETSTANDARD
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
#else
await _writer.WriteAsync(memory).ConfigureAwait(false);
#endif
RabbitMqClientEventSource.Log.CommandSent(segment.Count);
ArrayPool<byte>.Shared.Return(segment.Array);
}
await _writer.FlushAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
ESLog.Error("Background socket write loop has crashed", ex);
throw;
}
}
In this code _writer
is a buffered network stream:
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
The Write method is only a call to _channelWriter.TryWrite
so it doesn't have to check whether the writer is closed or not
public void Write(ReadOnlyMemory<byte> memory)
{
_channelWriter.TryWrite(memory);
}
Cancelling
means calling Close which completes the writer and once the pipeline completes, closes the socket :
try
{
_channelWriter.Complete();
_writerTask?.GetAwaiter().GetResult();
}
catch
{
// ignore, we are closing anyway
}
try
{
_socket.Close();
}
catch
{
// ignore, we are closing anyway
}
finally
{
_closed = true;
}
This ensures that any pending messages are still processed.
It's far more important to ensure that the payload doesn't require lots of allocations. RabbitMQ uses a Channel.CreateUnbounded<ReadOnlyMemory<byte>>
. The buffer itself comes from ArrayPool<byte>.Shared
. Once a frame is sent, the buffer is released back to the pool
ArrayPool<byte>.Shared.Return(segment.Array);
Nuking the Pipeline without leaking buffers
In normal operation you wouldn't need to cancel anyway. You'd only need that if you had to nuke the pipeline instead of stopping the publisher. One such case could be that a downstream worker can't process messages any more, because a network connection was terminated. In this case, calling Complete
on the publisher could still leave messages in the Channel that can no longer be processed.
The cooperative way to handle this is to discard the messages if cancellation is signaled, not cancel the Read
operation itself. If we use ArrayPool we must receive those buffers to release them anyway.
The RabbitMQ code doesn't work this way, but handling this needs only a simple change in the inner loop:
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
if(!token.IsCancellationRequested)
{
// Normal processing
}
ArrayPool<byte>.Shared.Return(segment.Array);
}
await _writer.FlushAsync().ConfigureAwait(false);
}
This ensures the worker method stops processing messages immediately without leaking buffers.
Structs and multi-threaded workers in RabbitMQ
RabbitMQ uses the same pattern in the ConsumerDispatcherChannelBase class. In this case the payload is a readonly struct WorkStruct which also holds a pooled buffer, RentedArray.
The difference this time is that there can be multiple consumers for the same RabbitMQ connection, so the class can start multiple worker tasks :
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
Shutting down works the same though. Complete the writer and wait for the workers to exit :
protected override Task InternalShutdownAsync()
{
_writer.Complete();
return _worker;
}