2

Suppose I have a many producers, 1 consumer unbound Channel, with a consumer:

await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
}

The problem is that the consume function does some IO access and potentially some network access too, thus before 1 message is consumed many more may be produced. But since the IO resources can't be accessed concurently, I can't have many consumers, nor can I throw the consume function into a Task and forget it.

The consume function is such that it can be easily modified to take multiple messages and handle them all in a batch. So my question is if there's a way to make the consumer take all messages in the channel queue whenever it tries to access it, something like this:

while (true) {
    Message[] messages = await channel.Reader.TakeAll();
    await consumeAll(messages);
}

Edit: 1 option that I can come up with, is:

List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
    Message msg;
    while (channel.Reader.TryRead(out msg))
        messages.Add(msg);
    if (messages.Count > 0)
    {
        await consumeAll(messages);
        messages.Clear();
    }
}

But I feel like thare should be a better way to do this.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Guiorgy
  • 1,405
  • 9
  • 26
  • Looking at the docs... if you wait on a [`ReadAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channelreader-1.readasync?view=net-6.0), then, immediately after, repeatedly call [`TryRead`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channelreader-1.tryread?view=net-6.0) until it returns `false`, it will be equivalent to `TakeAll` or `Flush`, right? – spender Jan 13 '22 at 12:56
  • Funny I just though of something simmilar. For now I'll wait to see if there's any better option. Perhaps I should use something other than `Channel` here. – Guiorgy Jan 13 '22 at 12:59
  • 1
    [`System.Threading.Tasks.Dataflow.BufferBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.bufferblock-1?view=net-6.0) might be to your liking. – spender Jan 13 '22 at 13:01
  • ...and a question I asked 11 years (!!!) ago... https://stackoverflow.com/questions/7863573/awaitable-task-based-queue – spender Jan 13 '22 at 13:02
  • Is the [`Buffer`](https://github.com/dotnet/reactive/blob/main/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs) operator, from the [System.Interactive.Async](https://www.nuget.org/packages/System.Interactive.Async/) package, helpful? – Theodor Zoulias Jan 13 '22 at 13:23
  • `BufferBlock` would probably work if `OutputAvailableAsync` was used before `TryReceiveAll` as it returns a `List?`, exactly what I want. Though it's a bit unfortunate that `BufferBlock` is **several** times slower than `Channel` ([source](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/)), so I am a bit reluctant to use it instead. – Guiorgy Jan 13 '22 at 13:25
  • @TheodorZoulias From what I can see the `Buffer` implementation is as the name implies a buffer that keeps awaiting until a certain number of elements are gathered and yields them in a batch. It is an interesting concept, but ideally I want to consume the messages as soon as they come and only handle them in a batch if there happens to be more than 1 of them. – Guiorgy Jan 13 '22 at 13:31
  • 1
    I see. How about this, a `Buffer` with a `TimeSpan` parameter, so that you can configure exactly how much time you want to wait before processing a batch? [Feature Request: Buffer with TimeSpan overload](https://github.com/dotnet/reactive/issues/1679) – Theodor Zoulias Jan 13 '22 at 13:43
  • @TheodorZoulias That would actually also be reasonable solution. Unfortunately, it's still in a feature request stage. – Guiorgy Jan 13 '22 at 14:10

2 Answers2

8

After reading Stephen Toub's primer on channels, I had a stab at writing an extension method that should do what you need (It's been a while since I did any C#, so this was fun).

public static class ChannelReaderEx
{
    public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
        this ChannelReader<T> reader, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default
    )
    {
        while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        {
            yield return reader.Flush().ToList();
        }
    }

    public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
    {
        while (reader.TryRead(out T item))
        {
            yield return item;
        }
    }
}

which can be used like this:

await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
    await ConsumeBatch(batch);
}
spender
  • 117,338
  • 33
  • 229
  • 351
4

Solving this problem on the ChannelReader<T> level, like in the excellent spender's answer, is practical and sufficient, but solving it on the IAsyncEnumerable<T> level might be a solution with a broader range of applications. Below is an extension method BufferImmediate for asynchronous sequences, that yields non-empty buffers with all the elements that are immediately available at the time the sequence is pulled:

/// <summary>
/// Splits the sequence into chunks that contain all the elements that are
/// immediately available.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> BufferImmediate<TSource>(
    this IAsyncEnumerable<TSource> source, int maxSize = -1)
{
    ArgumentNullException.ThrowIfNull(source);
    if (maxSize == -1) maxSize = Array.MaxLength;
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    return Implementation();

    async IAsyncEnumerable<IList<TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        using var linkedCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
        ValueTask<bool> moveNext = default;
        try
        {
            moveNext = enumerator.MoveNextAsync();
            List<TSource> buffer = new();
            TSource[] ConsumeBuffer()
            {
                TSource[] array = buffer.ToArray();
                buffer.Clear();
                return array;
            }
            ExceptionDispatchInfo error = null;
            while (true)
            {
                if (!moveNext.IsCompleted && buffer.Count > 0)
                    yield return ConsumeBuffer();
                TSource item;
                try
                {
                    if (!await moveNext.ConfigureAwait(false)) break;
                    item = enumerator.Current;
                }
                catch (Exception ex)
                {
                    error = ExceptionDispatchInfo.Capture(ex); break;
                }
                finally { moveNext = default; } // The ValueTask is consumed.
                buffer.Add(item);
                if (buffer.Count == maxSize) yield return ConsumeBuffer();
                try { moveNext = enumerator.MoveNextAsync(); }
                catch (Exception ex)
                {
                    error = ExceptionDispatchInfo.Capture(ex); break;
                }
            }
            if (buffer.Count > 0) yield return ConsumeBuffer();
            error?.Throw();
        }
        finally
        {
            if (!moveNext.IsCompleted)
            {
                // The last moveNext must be completed before disposing.
                // Cancel the enumerator, for more responsive completion.
                // Surface any error through the
                // TaskScheduler.UnobservedTaskException event.
                // Avoid throwing on DisposeAsync.
                try { linkedCts.Cancel(); }
                catch (Exception ex) { _ = Task.FromException(ex); }
                await Task.WhenAny(moveNext.AsTask()).ConfigureAwait(false);
            }
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

Usage example:

await foreach (var batch in channel.Reader.ReadAllAsync().BufferImmediate())
{
    await ConsumeBatch(batch);
}

The above implementation is non-destructive, meaning that no elements that have been consumed from the source sequence can be lost. In case the source sequence fails or the enumeration is canceled, any buffered elements will be emitted before the propagation of the error.

Note: this implementation uses the same List<TSource> as a buffer during the whole enumeration. In case the buffer becomes oversized at some point during the enumeration, it will remain oversized until the end of the enumeration. In case this is a problem, you can either create a new List<TSource> after emitting each batch, or look at this answer for a more sophisticated solution (shrinking the buffer).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104