2

Consider the following code:

var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions());

var t1 = Task.Run(async () =>
{
    DateTime start = DateTime.Now;

    for (int i = 0; i < 100000000; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    Console.WriteLine($"Writer took {DateTime.Now - start}");
    channel.Writer.Complete();
});

var t2 = Task.Run(async () =>
{
    while (true)
    {
        try
        {
            int r = await channel.Reader.ReadAsync();
        }
        catch (ChannelClosedException) { break; }
    }
});

await Task.WhenAll(t1, t2);

This takes about 10 seconds i.e. outputs something like "Writer took 00:00:10.276747". If I comment out the whole while block, it takes about 6 seconds. This is pretty consistent over multiple runs.

Question: if Channel is supposed to be an efficient producer/consumer mechanism, why does consuming in this case affect the producer?

More curiously, if I add these two methods:

static async Task Produce(Channel<int> channel)
{
    DateTime start = DateTime.Now;

    for (int i = 0; i < 100000000; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    Console.WriteLine($"Writer took {DateTime.Now - start}");
    channel.Writer.Complete();
}

static async Task Consume(Channel<int> channel)
{
    while (true)
    {
        try
        {
            int r = await channel.Reader.ReadAsync();
        }
        catch (ChannelClosedException) { break; }
    }
}

and then do:

var t1 = Produce(channel);
var t2 = Consume(channel);
await Task.WhenAll(t1, t2);

They finish in around 6 seconds either way (while block uncommented vs commented).

Question: Why does involving an explicit thread with Task.Run affect the efficiency?

rory.ap
  • 34,009
  • 10
  • 83
  • 174
  • In all pub/sub queues publishers and subscribers have to coordinate. That adds overhead. No subscribers means no coordination. Try using `ReadAllAsync` with `await foreach`. `ReadAsync` waits for an available item with `WaitForReadAsync` on every call before using `TryRead` to return it. `ReadAllAsync` on the other hand will return all available items after `WaitForReadAsync`. In this particular example, it will return almost everything – Panagiotis Kanavos Jul 03 '22 at 13:16
  • Channels guarantee read order too. Each request to read is queued, ensuring consumers retrieve messages in the order they requested them. That's actually more efficient than having a two-, three-, or 50-way lock over a collection from multiple producers and consumers. There are optimizations for [a single reader channel](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs#L27) using the `SingleReader` option. – Panagiotis Kanavos Jul 03 '22 at 13:41
  • Finally, you shouldn't use just the wall clock to count efficiency. You should use BenchmarkDotNet to count real scenarios compared to other similar implementations and compare not just publishing time but overall time *and* allocations. GC affects timings as well, and posting 10M items when only one can be consumed at a time is *not* efficient. Go (the language) typically allows only 1 item in each channel by default to avoid flooding memory *and* because dumping everything in RAM won't make consumers work faster. – Panagiotis Kanavos Jul 03 '22 at 13:44
  • To answer your actual question, you should check the [source code](https://github.com/dotnet/runtime/tree/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels) to see how channels are implemented. Deep down, [multi-reader unbounded channels](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs) use a ConcurrentQueue and a Queue of read requests. TryWrite enqueues a message *and* finds a reader to trigger – Panagiotis Kanavos Jul 03 '22 at 13:48
  • Check the code to [TryWrite](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs#L241) to see what's going on. With a single-reader channel though, both the [queueing and writing is simpler](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs#L281) – Panagiotis Kanavos Jul 03 '22 at 13:50
  • @PanagiotisKanavos you could consider moving your comments into an answer. Comments are intended for asking for more information or suggesting improvements. Answering questions in comments [should be avoided](https://prnt.sc/RfK7kkKxohGr "Use comments to ask for more information or suggest improvements. Avoid answering questions in comments."). – Theodor Zoulias Jul 03 '22 at 19:49

1 Answers1

4

This is an interesting question but not because of any lack of efficiency. In fact, the question's numbers shows channels are very efficient. Writing to an unbounded channel involves:

  1. Writing to an internal ConcurrentQueue and
  2. Waking one of many possible readers to notify.

This means that enqueuing and waking a reader only takes 66% more than simply enqueueing into a ConcurrentQueue. That's not bad at all. Unfortunately, that number is deceptive, especially in this case, where a Task or ValueTask is larger than the int payload and the "work" is negligible.

Benchmark libraries like BenchmarkDotNet run tests multiple times until they can get a statistically stable sample, with warmup and cooldown steps to account for JIT, caching and warmup effects.

To get a baseline, I used BenchmarkDotnet with this benchmark class. I couldn't resist adding a parameter for the SingleReader optimization which assumes there can be only a single reader at a time, so uses a simpler queue and locking.

[MemoryDiagnoser]
[ThreadingDiagnoser]
public class QuestionBenchmarks
{
    [Params(true, false)] // Arguments can be combined with Params
    public bool SingleReader;

    static async Task Produce(Channel<int> channel)
    {
        DateTime start = DateTime.Now;

        for (int i = 0; i < 100000000; i++)
        {
            await channel.Writer.WriteAsync(i);
        }
        channel.Writer.Complete();
    }

    static async Task Consume(Channel<int> channel)
    {
        while (true)
        {
            try
            {
                int r = await channel.Reader.ReadAsync();
            }
            catch (ChannelClosedException) { break; }
        }
    }

    [Benchmark]
    public async Task InlinedBoth()
    {
        var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });

        var t1 = Task.Run(async () =>
        {
            DateTime start = DateTime.Now;

            for (int i = 0; i < 100000000; i++)
            {
                await channel.Writer.WriteAsync(i);
            }

            channel.Writer.Complete();
        });

        var t2 = Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    int r = await channel.Reader.ReadAsync();
                }
                catch (ChannelClosedException) { break; }
            }
        });

        await Task.WhenAll(t1, t2);
    }

   
    [Benchmark]
    public async Task InlinedProduceOnly()
    {
        var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });

        var t1 = Task.Run(async () =>
        {
            DateTime start = DateTime.Now;

            for (int i = 0; i < 100000000; i++)
            {
                await channel.Writer.WriteAsync(i);
            }

            channel.Writer.Complete();
        });

        await t1;
    }


    [Benchmark]
    public async Task WithMethods()
    {
        var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });

        var producer = Produce(channel);
        var consumer = Consume(channel);

        await Task.WhenAll(producer, consumer);
    }

    [Benchmark]
    public async Task WithMethodsProduceOnly()
    {
        var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });

        var producer = Produce(channel);

        await producer;
    }

}

And got a big surprise, that should have been expected:

// * Summary *

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22621
Intel Core i7-10850H CPU 2.70GHz, 1 CPU, 12 logical and 6 physical cores
.NET SDK=7.0.100-preview.5.22307.18
  [Host]     : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
  DefaultJob : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

With values

Method SingleReader Mean Error StdDev Completed Work Items Lock Contentions Gen 0 Gen 1 Gen 2 Allocated
InlinedBoth False 4.193 s 0.0825 s 0.0772 s 9675.0000 32071.0000 - - - 265 KB
InlinedProduceOnly False 3.842 s 0.0768 s 0.1654 s 1.0000 - 4000.0000 4000.0000 4000.0000 786,464 KB
WithMethods False 6.181 s 0.1233 s 0.2027 s - - 4000.0000 4000.0000 4000.0000 786,463 KB
WithMethodsProduceOnly False 3.805 s 0.0753 s 0.0837 s - - 4000.0000 4000.0000 4000.0000 786,462 KB
InlinedBoth True 4.342 s 0.1200 s 0.3483 s 84484.0000 22848.0000 - - - 71 KB
InlinedProduceOnly True 2.990 s 0.0595 s 0.0908 s 1.0000 - 3000.0000 3000.0000 3000.0000 393,230 KB
WithMethods True 4.158 s 0.0814 s 0.1426 s - - 3000.0000 3000.0000 3000.0000 393,230 KB
WithMethodsProduceOnly True 2.879 s 0.0547 s 0.0512 s - - 3000.0000 3000.0000 3000.0000 393,232 KB

Completed Work Items is the number of tasks completed in the ThreadPool. The benchmarks with methods don't use the ThreadPool at all. Which of course they don't since they don't use Task.Run! The code that uses methods doesn't use multiple threads so there are no lock conflicts. Same with the code that has no producers.

This means the benchmarks can't be compared. Even so, it's obvious that using SingleReader uses less memory

The entire benchmark with the 100M items took 28 minutes, so I'll wait for a bit before creating a new, correct benchmark with far fewer items

Global total time: 00:28:23 (1703.72 sec), executed benchmarks: 10
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Thank you for all your hard work! “ Same with the code that has no producers.” did you mean consumers? – rory.ap Jul 04 '22 at 14:30