2

The complete reproducible code is on github, the memory will soon rocket after launching the executable. The code resides mostly in AsyncBlockingQueue.cs class.

The following code implements a simple async "blocking" queue:

        public async Task<T> DequeueAsync(
            int timeoutInMs = -1,
            CancellationToken cancellationToken = default)
        {
            try
            {
                using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
                {
                    T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
                    return value;
                }
            }
            catch (ChannelClosedException cce)
            {
                await Console.Error.WriteLineAsync("Channel is closed.");
                throw new ObjectDisposedException("Queue is disposed");
            }
            catch (OperationCanceledException)
            {
                throw;
            }
            catch (Exception ex)
            {
                await Console.Error.WriteLineAsync("Dequeue failed.");
                throw;
            }
        }


        private CancellationTokenSource GetCancellationTokenSource(
            int timeoutInMs,
            CancellationToken cancellationToken)
        {
            if (timeoutInMs <= 0)
            {
                return null;
            }

            CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
            return cts;
        }

When used in this way, it has memory leaks:

try
{
   string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
   // timeout 
}

enter image description here

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
HooYao
  • 554
  • 5
  • 19
  • It seems an old topic and but it still confuses me and leaks memory. – HooYao May 17 '21 at 16:31
  • 1
    It’s by design https://github.com/dotnet/runtime/issues/761 – HooYao May 18 '21 at 23:54
  • Channels don't leak when used properly. A channel isn't a queue, it's a different container with different usage idioms. All of the code you wrote is essentially just a call to `_channel.Reader.ReadAsync`. The rest is trying to handle problems introduced by the very existence of `DequeueAsync`. – Panagiotis Kanavos May 19 '21 at 15:03
  • `AsyncBlockingQueue` is self-contradictory and definitely the *opposite* of what a Channel is. All channel operations are non-blocking. You can think of a Channel as an `AsyncQueue` except it's not. There's a very good reason there are separate `ChannelReader` and `ChannelWriter` classes. – Panagiotis Kanavos May 19 '21 at 15:07
  • @PanagiotisKanavos Logically "blocking" not under the hood, I needed a queue that behaves like a blocking queue in async scenario. The enqueue reader even canceled design causes the above issue. Talked to Stephen Toub,they are sticking to the design for now. – HooYao May 19 '21 at 16:21
  • As they should. Because that's not what channels are at all. Channels are higher-level constructs that *use* queues. – Panagiotis Kanavos May 19 '21 at 16:28
  • How are you trying to use this class? You'd only notice an issue if you tried to *poll* the channel for a long time, which is definitely not how async programming works. Clients are meant to use `await` and continue with the data once it's available. Awaiting is an alternative to polling – Panagiotis Kanavos May 20 '21 at 14:29
  • The reason memory increases isn't a leak. Channels guarantee order, which means `ReadAsync` operations have to be processed in the order they are made. When you make a `ReadAsync` call an `AsyncOperation` object is queues until something is posted to the channel. When you cancel the operation, it's not removed from the queue - it's a queue, only the head can be dequeued. Which happens only when a new item is written. There's no reason to do so earlier because *channels aren't meant to be polled*. – Panagiotis Kanavos May 20 '21 at 16:41
  • If you really want to poll the channel (why??) use [TryRead](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channelreader-1.tryread?view=net-5.0). Why do that at all though, when `await ReadAsync` will notify the caller immediately when new data arrives? This isn't a blocking call. No thread is frozen – Panagiotis Kanavos May 20 '21 at 16:48
  • Can you set the [SingleReader](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channeloptions.singlereader?view=net-5.0) option for the channel? When there's only a single reader, the channel won't queue async operations, it will use a single instance as a single reader can only make a single Read request at a time – Panagiotis Kanavos May 20 '21 at 17:06
  • 1
    please explain the *real* problem. You're trying to build an async queue with a timeout on `ReadAsync` but why? Knowing the requirements will help suggest a solution for your specific problem. Do you want to use polling (why)? Why poll a queue instead the queue notifying consumers? Is there one or many consumers? If there are many, should they receive data in the order *they requested it*? That's what Channel does, which caused you problems. Or is it OK to serve any active request? That could be tricky,which is why AsyncOperations are used to represent all pending requests. – Panagiotis Kanavos May 21 '21 at 06:15
  • @PanagiotisKanavos this issue explains by itself https://github.com/dotnet/runtime/issues/761 for me, I'm passing messages via the channel, I need it to behave like a blocking queue, please don't argue the "blocking" word again, i don't care what's under the hood, state machine, iocp, epoll, kqueue. on the other end of the channel, there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up. In the issue, people with same use cases are having the same problem. – HooYao May 21 '21 at 16:10
  • @PanagiotisKanavos I have talked to Stephen Toub, he would say, they will look into it. I have figured out a nasty way to solve it. I will post it later. – HooYao May 21 '21 at 16:12
  • sounds like you want a batch operation then, not polling. Add that in the question itself. It's relatively easy to create such a function that reads items from a ChannelReader, stores them in eg a List and sends the entire List to a target channel when either the count or timeout are reached. That function only needs a `Timer` that fires periodically. Doing either alone (batch by count or period) is *very* easy. Combining both so they don't get into each other's way is a bit more work – Panagiotis Kanavos May 21 '21 at 16:43
  • The question has already received answers, so it would be inadvisable to change the question in a way that would invalidate existing answers. This question is about the channels leaking memory in some scenario (it is tagged with the [memory-leaks](https://stackoverflow.com/questions/tagged/memory-leaks) tag). If the OP wants ideas/suggestions about how to implement some desirable functionality, they should post a new question IMHO. – Theodor Zoulias May 21 '21 at 18:20

3 Answers3

1

Update

From the comments :

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up

This means that what's really needed is a way to batch messages by both count and period. Doing either is relatively easy .

This method batches by count. The method adds messages to the batch list until the limit is reached, sends the data downstream and clears the list :

static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    _ = Task.Run(async ()=>{
        var batch=new List<Message>(count);
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
            if(batch.Count==count)
            {
                await writer.WriteAsync(batch.ToArray());
                batch.Clear();
            }
        }
    },token)
   .ContinueWith(t=>writer.TryComplete(t.Exception));
   return channel;
}

A method that batches by period is more complicated, as the timer can fire at the same time a message is received. Interlocked.Exchange replaces the existing batch list with a new one and sends the batched data downstream. :

static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    var batch=new List<Message>();
    Timer t=new Timer(async obj =>{
        var data=Interlocked.Exchange(ref batch,new List<Message>());
        writer.WriteAsync(data.ToArray());
    },null,TimeSpan.Zero,period);

    _ = Task.Run(async ()=>{
        
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
        }
    },token)
   .ContinueWith(t=>{
        timer.Dispose();
        writer.TryComplete(t.Exception);
   });
   return channel;
}

To do both - I'm still working on that. The problem is that both the count and timer expiration can occur at the same time. Worst case, lock(batch) can be used to ensure only the thread or loop can send the data downstream

Original Answer

Channels don't leak when used properly - just like any other container. A Channel isn't an asynchronous queue and definitely not a blocking one. It's a very different construct, with completely different idioms. It's a higher-level container that uses queues. There's a very good reason there are separate ChannelReader and ChannelWriter classes.

The typical scenario is to have a publisher create and own the channel. Only the publisher can write to that channel and call Complete() on it. Channel doesn't implement IDisposable so it can't be disposed. The publisher only provides a ChannelReader to subscribers.

Subscribers only see a ChannelReader and read from it until it completes. By using ReadAllAsync a subscriber can keep reading from a ChannelReader until it completes.

This is a typical example :

ChannelReader<Message> Producer(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<100;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }
            //Simulate some work
            await Task.Delay(100);
            await writer.WriteAsync(new Message(...));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

The subscriber only needs a ChannelReader to work. By using ChannelReader.ReadAllAsync the subscriber only needs await foreach to process messages:

async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        //Use the message
    }
}

The subscriber can produce its own messages by returning a ChannelReader. And this is where things become very interesting, as the Subscriber method becomes a step in a pipeline of chained steps. If we convert the methods to extension methods on ChannelReader we can easily create an entire pipeline.

Let's generate some numbers :

ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<nums;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }

            await writer.WriteAsync(i*7);  
            await Task.Delay(100);        
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

Then double and square them :

ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(2.0*msg);          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(Math.Sqrt(msg));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

And finally print them

async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        Console.WriteLine(msg);
    }
}

Now we can build a pipeline


await Generate(100)
          .Double()
          .Square()
          .Print();

And add a cancellation token to all steps :

using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
          .Double(cts.Token)
          .Square(cts.Token)
          .Print(cts.Token);

Memory usage can increase if one step produces messages faster than they're consumed for a long time. This is easily handled by using a bounded instead of an unbounded channel. This way, if a method is too slow all the previous methods will have to await before publishing new data.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Panagiotis using the [primitive](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) `ContinueWith` method indicates that the example is not production-ready. Also honestly I think that your answer is not particularly relevant to the OP's question. The OP is trying to solve a problem, made the assumption that a `Channel` would be a good tool to solve the problem, and found that using it resulted in an unexpected memory leak. Teaching them how to use properly a `Channel`, is unlikely to help them solve the problem at hand. – Theodor Zoulias May 19 '21 at 17:33
  • 1
    @TheodorZoulias have you worked with channels in production? If you did, you'd understand what that `ContinueWith` is doing. And `you're using the wrong tool` is the best answer when one is using the wrong tool, or explaining why the .NET team won't modify Channels to accommodate improper use. And `Show don't tell`is a lot better than saying `you're doing it wrong` – Panagiotis Kanavos May 20 '21 at 05:55
  • Panagiotis OK, me saying that it's "not production-ready" was probably a bit out of place. The issue with using the `ContinueWith` without specifying explicitly the `TaskScheduler` is not tied with the channels in any particular way. Also spawning fire-and-forget tasks feels somewhat wrong. It creates the possibility of unobserved exceptions, although admittedly this possibility is minuscule in your example. Regarding the channels being the wrong tool for the job, what could be the right tool? Do you know of any async queue offering cancellation and timeout support, that doesn't leak memory? – Theodor Zoulias May 20 '21 at 07:10
  • I've now read more carefully your pipeline suggestion. If you want to know my opinion, trying to make a robust pipeline framework based on `ChannelReader`s is a lost cause. This class is not disposable, so it cannot take advantage of the syntax support for disposing enumerables after a `foreach` loop. This means that an error in the `Print` operator cannot be easily propagated backwards to cancel the `Task.Run` tasks spawned by the `Generate`, `Double` and `Square` operators. These tasks will get stuck, they will not be recycled, and they will stay in memory until the process terminates. – Theodor Zoulias May 20 '21 at 13:49
  • @TheodorZoulias and yet, Go works just fine with Channels. Goroutines in Go use only ChannelReaders (ch->) and ChannelWriters (<-ch), never the channel itself. The producer owns the channel and is the only one allowed to write to it, everyone else gets a reader It's not an accident the .NET class is called the same. The comments makes several mistaken assumptions most likely due to unfamiliarity with Channels in general – Panagiotis Kanavos May 20 '21 at 14:00
  • @TheodorZoulias check [Go Concurrency Patterns: Pipelines and cancellation](https://blog.golang.org/pipelines) from the Go language blog. It explains how channels are used in pipelines – Panagiotis Kanavos May 20 '21 at 14:40
  • I tried to read the article, but I have a hard time following it because I'm not familiar with the Go language. What is the C# equivalent of a goroutine, a fire-and-forget `Task.Run`? Btw in what aspect would you say that a `Channel` differs from a `BufferBlock`? The later allows just as easily the distinction between a writer and a reader, via its `ITargetBlock` and `ISourceBlock` interfaces. My understanding is that the selling point of the channels is speed and efficiency, and allowing a potentially huge memory leak in a reasonable scenario defeats its main purpose of existence. – Theodor Zoulias May 20 '21 at 16:53
  • @TheodorZoulias there's no leak. There's bad usage. If you put 100K cancelled requests into a queue meant to be processed only when new data arrives, it's not the queue's fault. Have you read [Stephen Toub's](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) or [Steven Gordon's](https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels) articles on Channels? Especially the [Internals on ChannelReader](https://www.stevejgordon.co.uk/dotnet-internals-system-threading-channels-unboundedchannelt-part-3) which explain about async operations – Panagiotis Kanavos May 20 '21 at 16:55
  • Why should I delve into the internals of the `ChannelReader`? To learn how to write leaky code? Implementing an asynchronous queue is a solved problem for at least a decade, and creating a new one that is more efficient, except when it's not, doesn't make much sense. – Theodor Zoulias May 21 '21 at 03:28
  • @TheodorZoulias to clear up misconceptions. What you say up to now is no different than someone blaming Microsoft because an application encountered problems after opening 10 database connections per second without closing them. – Panagiotis Kanavos May 21 '21 at 05:53
  • @TheodorZoulias `why should I delve` because that's how I found that `SingleReader` optimizes AsyncOperation usage. A *lot* of problematic answers could be improved by actually reading the fabulous manuals, articles and courses. I'm pretty sure Stephen Toub, David Fowler, Marc Gravel, Steve Gordon aren't idiots. They produce very fast software. Why assume *they* did something dumb? It's more logical to assume *we* don't understand how things are used – Panagiotis Kanavos May 21 '21 at 06:03
  • @TheodorZoulias if you want to understand the CSP paradigm and how Channels and TPL Dataflow should be used, learn Go, read the pipeline blog post and the patterns in the book [Concurrency in Go](https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/) . CSP is a *very* different way of working. Both DataFlow and Channels are CSP constructs, not workers or agents. There are a few meetup and conference talks about TPL DataFlow as well, that would help clear up the confusion – Panagiotis Kanavos May 21 '21 at 06:07
  • Panagiotis even the smartest people can write buggy code, or fail to predict how the tools they build will be used by other people, or become lazy when it comes to documenting the behavior of their complex algorithms. Anyway, I see that you are very passionate at defending the status quo of the `ChannelReader.ReadAsync` method, so I won't prolong the argument. I think that we've both made our points clear. – Theodor Zoulias May 21 '21 at 06:47
  • @PanagiotisKanavos the problem is quite obvious, from the bounded channel source code `var reader = new AsyncOperation(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken); parent._blockedReaders.EnqueueTail(reader); return reader.ValueTaskOfT;` the reader is enqueued to the queue even it's cancelled. – HooYao May 21 '21 at 16:15
  • @HooYao it's not *removed* from the queue once cancelled. Because it's a queue. Which is necessary to preserve the order of reads. Again, that's perfectly reasonable given what a `Channel` is for: a bridge between publishers and consumers, using a `pull` model, not a standalone collection with polling. The designers went out of their way to make it *hard* to use a Channel as a single container. Besides, your actual problem is already handled by different libraries already. Which, if misused, will also result in increasing memory usage – Panagiotis Kanavos May 24 '21 at 09:07
1

I was able to reproduce the issue you are observing. It is clearly a flaw in the Channels library IMHO. Here is my repro:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        var bufferBlock = new BufferBlock<int>();
        var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
        var mem0 = GC.GetTotalMemory(true);
        int timeouts = 0;
        for (int i = 0; i < 10; i++)
        {
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.ElapsedMilliseconds < 500)
            {
                using var cts = new CancellationTokenSource(1);
                try
                {
                    await channel.Reader.ReadAsync(cts.Token);
                    //await bufferBlock.ReceiveAsync(cts.Token);
                    //await asyncCollection.TakeAsync(cts.Token);
                }
                catch (OperationCanceledException) { timeouts++; }
            }
            var mem1 = GC.GetTotalMemory(true);
            Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
                + $" Allocated: {mem1 - mem0:#,0} bytes");
        }
    }
}

Output:

 1) Timeouts:   124, Allocated: 175,664 bytes
 2) Timeouts:   250, Allocated: 269,720 bytes
 3) Timeouts:   376, Allocated: 362,544 bytes
 4) Timeouts:   502, Allocated: 453,264 bytes
 5) Timeouts:   628, Allocated: 548,080 bytes
 6) Timeouts:   754, Allocated: 638,800 bytes
 7) Timeouts:   880, Allocated: 729,584 bytes
 8) Timeouts: 1,006, Allocated: 820,304 bytes
 9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes

Try it on Fiddle.

Around 800 bytes are leaked per operation, which is quite nasty. The memory is reclaimed every time a new value is written in the channel, so for a busy channel this design flaw should not be an issue. But for a channel that receives values sporadically, this can be a showstopper.

There are other asynchronous queue implementations available, that do not suffer from the same issue. You can try commenting the await channel.Reader.ReadAsync(cts.Token); line and uncommenting any of the two lines below. You will see that both the BufferBlock<T> from the TPL Dataflow library, and the AsyncCollection<T> from the Nito.AsyncEx.Coordination package, allow asynchronous retrieval from the queue with timeout, without memory leakage.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • It's not a flaw at all. This is incorrect usage. Channels guarantee the order of operations so when a `ReadAsync` call is made, an `AsyncOperation` object is queued. These objects are dequeued only when a new item is posted. By using polling the code enqueues a ton of `AsyncOperation` objects that never get dequeued. That's not a flaw, because a channel isn't meant to be polled with `ReadAsync`. There's no reason to do so - the client will be notified immediately when new data arrives. For polling one should use `TryRead` – Panagiotis Kanavos May 20 '21 at 16:47
  • @Panagiotis "a channel isn't meant to be polled with `ReadAsync`." Where do you find this info in the docs? The [`ReadAsync`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels.channelreader-1.readasync) accepts an optional `CancellationToken`, and nowhere says that if you cancel the token the channel will leak memory. This method should either not accept a token, or it should not leak, or it should be documented that is leaks. Microsoft knowingly letting random folks fall unexpectedly in the pit of failure, without them doing anything obviously wrong, is rather poor IMHO. – Theodor Zoulias May 20 '21 at 17:10
  • If you create the channel with `new UnboundedChannelOptions{SingleReader=true}` [memory usage doesn't change](https://dotnetfiddle.net/m7CaRQ) because there is no need to queue the read requests. A single reader can only make a single read request at a time – Panagiotis Kanavos May 20 '21 at 17:10
  • A CancellationToken means the method can be cancelled. It's neither a timeout nor a flag that says the method can be used for polling. `Microsoft knowingly letting random folks fall unexpectedly in the pit of failure` no, this took explicit effort to misuse the class. There are several articles on Channels by Stephen Toub, Steve Gordon and others. The SignalR docs show how to use channels to stream to clients. – Panagiotis Kanavos May 20 '21 at 17:11
  • @Panagiotis good find. So the `new UnboundedChannelOptions() { SingleReader = true }` is the solution to OP's problem. Will you add it in your answer, are should I add it in mine? – Theodor Zoulias May 20 '21 at 17:15
  • @Panagiotis Microsoft [advertises](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) channels as *"a data structure that’s used to store produced data for a consumer to retrieve"*. This in my eyes means that it's a general-use tool for solving producer/consumer problems. Nowhere I see implied that it's a tool that must be used according to a strict pattern, and not following this pattern results to all hell breaking loose. Would you really be OK if all built-in APIs that accept tokens, leaked memory if some undocumented pattern was not followed? – Theodor Zoulias May 20 '21 at 17:25
  • @TheodorZoulias single reader doesn't solve the situation, in my reproducible demo, memory explodes as before – HooYao May 21 '21 at 16:27
  • @HooYao too bad then. Have you considered switching from the `Channel` to a `BufferBlock`, or some other alternative? – Theodor Zoulias May 21 '21 at 16:45
  • @TheodorZoulias thank you I may switch to BufferBlock, it works as expected. I'm evaluating the its performance, it shouldn't have any difference in my case. I actually came out a nasty solution, sending a dummy message to the channel when timeout cancel, it has minimal impact, but BufferBlock looks way better clean. – HooYao May 22 '21 at 03:03
  • @HooYao yeap, I don't expect you to have any performance problems with a `BufferBlock`. It is a bit older technology, it predates `ValueTask`s so it doesn't take advantage of them, but otherwise it's an excellent tool. In case you want to expose its contents as an `IAsyncEnumerable`, there is a `ToAsyncEnumerable` extension method [here](https://stackoverflow.com/questions/49389273/for-a-tpl-dataflow-how-do-i-get-my-hands-on-all-the-output-produced-by-a-transf/62410007#62410007). – Theodor Zoulias May 22 '21 at 04:28
0

I was so preoccupied with the technical details of the actual problem I forgot the problem is already solved almost out-of-the-box.

From the comments, it looks like the actual question is :

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up

This is provided by the Buffer operator of ReactiveX.NET, which is built by the same team that creates System.Linq.Async :

ChannelReader<Message> reader=_channel;

IAsyncEnumerable<IList<Message>> batchItems = reader.ReadAllAsync(token)
                                              .ToObservable()
                                              .Buffer(TimeSpan.FromSeconds(30), 5)
                                              .ToAsyncEnumerable();

await foreach(var batch in batchItems.WithCancellation(token))
{
 ....
}

These calls can be converted into an extension method, so instead of a DequeueAsync, the question's class could have a BufferAsync or GetWorkItemsAsync method:

public IAsyncEnumerable<T[]> BufferAsync(
            TimeSpan timeSpan,
            int count,
            CancellationToken cancellationToken = default)
{
    return _channel.Reader.BufferAsync(timeSpan,count,cancellationToken);
}

ToObservable and ToAsyncEnumerable are provided by System.Linq.Async and convert between IAsyncEnumerable and IObservable, the interface used by ReactiveX.NET.

Buffer is provided by System.Reactive and buffers item by count or period, even allowing for overlapping sequences.

While LINQ and LINQ to Async provide query operators over objects, Rx.NET does the same over time-based streams of events. It's possible to aggregate over time, buffer events by time, throttle them etc. The examples in the (unofficial) doc page for Buffer show how to create overlapping sequences (eg sliding windows). The same page shows how Sample or Throttle can be used to throttle fast event streams by propagating only the last event in a period.

Rx uses a push model (new events are pushed to subscribers) while IAsyncEnumerable, like IEnumerable, use a pull model. ToAsyncEnumerable() will cache items until they're requested, which can lead to problems if nobody's listening.

With these methods, one could even create extension methods to buffer or throttle the publishers :

    //Returns all items in a period
    public static IAsyncEnumerable<IList<T>> BufferAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan, 
        int count,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Buffer(timeSpan, count)
            .ToAsyncEnumerable();
    }
        
        
    //Return the latest item in a period
    public static IAsyncEnumerable<T> SampleAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Sample(timeSpan)
            .ToAsyncEnumerable();
    }
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • "*Rx uses a push model (new events are pushed to subscribers) while IAsyncEnumerable, like IEnumerable, use a pull model. ToAsyncEnumerable() will cache items until they're requested, which can lead to problems if nobody's listening.*" <== That's a serious flaw of this solution, and it's not fixable. I'll post later a correct solution to the problem, as an answer to [this](https://stackoverflow.com/questions/67661709/how-to-batch-an-iasyncenumerablet-enforcing-a-maximum-interval-policy-between) question. – Theodor Zoulias May 24 '21 at 09:26
  • @TheodorZoulias instead of assuming everything is broken, consider that tools are created to handle specific jobs. You can't use hammers with screws. And parallel/concurrent/async processing is a *huge* area that can't be covered by a single programing paradigm, much less a single technique. Remember how you answered every question with Semaphores before discovering ActionBlock? None of these things are new, we're using concepts and techniques created in the 1970s. .NET covers a *lot* of them and even allows combining them. You need to know what tool to use when. – Panagiotis Kanavos May 24 '21 at 09:32
  • @TheodorZoulias besides, the items will be cached somewhere. Either in the Channel, or the ConcurrentQueue created by `ToAsyncEnumerable`. No matter what queue or "solution" is used, published items will have to go *somewhere* until they're consumed. If a publisher sends 500 items and nobody's listening, those 500 items will have to remain in memory one way or another – Panagiotis Kanavos May 24 '21 at 09:36
  • Panagiotis if one consumer is not fast enough to cope with the flood of incoming messages, the OP could add more consumers. Each consumer should take from the channel as many messages as it can process, and no more. The solution of this answer will result to each consumer greedily draining the channel, and putting unprocessed messages in a hidden queue, introducing more latency than what the OP would be willing to accept. So this solution is flawed, and it's flaw is demonstrable. And me being "kind" and not pointing out this flaw would make no good to anyone in the long run. – Theodor Zoulias May 24 '21 at 09:57
  • @TheodorZoulias you're missing the point. `Buffer`, 'Sample` and `Throttle` are a way to handle what you describe by batching/discarding data so consumers can work on a batch at a time. Nothing prevents the use of multiple consumers anyway. Besides, the OP's question was how to create a batch of data, not how to use multiple consumers. That's trivial to do with ChannelReader - just pass it to all consumers. There wouldn't be any need for timeouts – Panagiotis Kanavos May 24 '21 at 10:09
  • Panagiotis the OP has a source of messages, and they want to process them in batches and with limited latency. That's the problem domain. I am not missing the point by focusing on the problem domain. I would miss the point if I was trying to *change* the problem domain, so that it fits to the tools I want to use. I may have done it in other occasions, but not in this one. I would like to ask you to stay focused on the problem too. Either point to a concrete usage of the `Sample`, `Throttle` etc operators that solve *this* problem, or not mention them at all. – Theodor Zoulias May 24 '21 at 10:22