2

I have the below code:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
    .Range(1, 5)   
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }
        }));

var producers = Enumerable
    .Range(1, 5)    
    .Select(producerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                var t = $"Message {i}";
                Console.WriteLine($"Producing {t} on producer {producerNumber}");

                await channel.Writer.WriteAsync(t);
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            }
        }));

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

await Task.WhenAll(consumers);

Which works as it should however im wanting it to consume at the same time as producing. However

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

Blocks the consumer from running until its complete and I can't think of a way of getting them both to run?

Alexander Petrov
  • 13,457
  • 2
  • 20
  • 49
Houlahan
  • 783
  • 3
  • 19
  • 46
  • You can improve your code a *lot* if you use some Channel coding patterns. The problem isn't `await Task.WhenAll` and there's nothing wrong with `ContinueWith` if used properly. There *are* problems with throwing exceptions to simulate control flow though – Panagiotis Kanavos Sep 21 '21 at 06:45
  • 1
    I posted an answer that shows how channels can be used to not only simplify the code but make coordination, composition and error handling a *lot* easier – Panagiotis Kanavos Sep 21 '21 at 07:35
  • Worse, if you use exception blocks to complete the channel, the *consumer* will never know the producer threw. In this case it may not matter a lot but in production code this could cause eg a database consumer to commit changes even after errors – Panagiotis Kanavos Sep 21 '21 at 07:51
  • 1
    Now that .NET 6 is almost out, you can simplify the idiomatic way of using multiple consumers with `Parallel.ForEachAsync`, eg `await Parallel.ForEachAsync(reader.ReadAllAsync(token),new (){MaxDegreeOfParallelism=3},item=>Console.WriteLine(item))`. You'll have to completely rewrite non-idiomatic code – Panagiotis Kanavos Oct 26 '21 at 12:08

2 Answers2

3

There are a couple of issues with the code, including forgetting to enumate the producers and consumers enumerables. IEnumerable is evaluated lazily, so until you actually enumerate it with eg foreach or ToList, nothing is generated.

There's nothing wrong with ContinueWith when used properly either. It's definitely better and cheaper than using exceptions as control flow.

The code can be improved a lot by using some common Channel coding patterns.

  1. The producer owns and encapsulates the channel
  2. The producer exposes only Reader(s)

Plus, ContinueWith is an excellent choice to signal a ChannelWriter's completion, as we don't care at all which thread will do that. If anything, we'd prefer to use one of the "worker" threads to avoid a thread switch.

Let's say the producer function is:

async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
    return Task.Run(async () =>
    {
        var rnd = new Random();
        for (var i = 0; i < 10; i++)
        {
            var t = $"Message {i}";
            Console.WriteLine($"Producing {t} on producer {producerNumber}");

            await channel.Writer.WriteAsync(t);
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        }
    }
}

Producer

The producer can be :

ChannelReader<string> ProduceData(int dop)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
                 .Select(producerNumber => Produce(producerNumber))
                 .ToList();
    _ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
                       .
    
    return channel.Reader;
}

Completion and error propagation

Notice the line :

_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));

This says that as soon as the producers complete, the writer itself should complete with any exception that may be raised. It doesn't really matter what thread the continuation runs on as it doesn't do anything other than call TryComplete.

More importantly, t=>writer.TryComplete(t.Exception) propagates the worker exception(s) to downstream consumers. Otherwise the consumers would never know something went wrong. If you had a database consumer you'd want it to avoid finalizing any changes if the source aborted.

Consumer

The consumer method can be:

async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
    var tasks= Enumerable
        .Range(1, dop)   
        .Select(consumerNumber =>
            Task.Run(async () =>
            {
                await foreach(var item in reader.ReadAllAsync(token))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }));
    await Task.WhenAll(tasks);
}

In this case await Task.WhenAll(tasks); enumerates the worker tasks thus starting them.

Nothing else is needed to produce all generated messages. When all producers finish, the Channel.Reader is completed. When that happens, ReadAllAsync will keep offering all remaining messages to the consumers and exit.

Composition

Combining both methods is as easy as:

var reader=Produce(10);
await Consume(reader);

General Pattern

This is a general pattern for pipeline stages using Channels - read the input from a ChannelReader, write it to an internal Channel and return only the owned channel's Reader. This way the stage owns the channel which makes completion and error handling a lot easier:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
              .Select(async i=>Task.Run(async ()=>
              {
                  await(var item in reader.ReadAllAsync(token))
                  {
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  }
              },token));
    _ =Task.WhenAll(tasks)
           .ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

This allows chaining multiple "stages" together to form a pipeline:

var finalReader=Producer(...)
                .Crunch1()
                .Crunch2(10)
                .Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}

Producer and consumer methods can be written in the same way, allowing, eg the creation of a data import pipeline:

var importTask = ReadFiles<string>(somePath)
                  .ParseCsv<string,Record[]>(10)
                  .ImportToDb<Record>(connectionString);

await importTask;

With ReadFiles

static ChannelReader<string> ReadFiles(string folder)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var task=Task.Run(async ()=>{
        foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
        {
            await writer.WriteAsync(path);
        }
    });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

Update for .NET 6 Parallel.ForEachAsync

Now that .NET 6 is supported in production, one could use Parallel.ForEachAsync to simplify a concurrent consumer to :

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
                           int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var dop=new ParallelOptions { 
                MaxDegreeOfParallelism = dop,
                CancellationToken = token
    };
    var task=Parallel.ForEachAsync(
                 reader.ReadAllAsync(token),
                 dop,
                 async item =>{
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
2

The consumers and producers variables are of type IEnumerable<Task>. This a deferred enumerable, that needs to be materialized in order for the tasks to be created. You can materialize the enumerable by chaining the ToArray operator on the LINQ queries. By doing so, the type of the two variables will become Task[], which means that your tasks are instantiated and up and running.

As a side note, the ContinueWith method requires passing explicitly the TaskScheduler.Default as an argument, otherwise you are at the mercy of whatever the TaskScheduler.Current may be (it might be the UI TaskScheduler for example). This is the correct usage of ContinueWith:

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
  1. Code analyzer CA2008: Do not create tasks without passing a TaskScheduler
  2. "[...] This is why in production library code I write, I always explicitly specify the scheduler I want to use." (Stephen Toub)

Another problem is that any exceptions thrown by the producers will be swallowed, because the tasks are not awaited. Only the continuation is awaited, which is unlikely to fail. To solve this problem, you could just ditch the primitive ContinueWith, and instead use async-await composition (an async local function that awaits the producers and then completes the channel). In this case not even that is necessary. You could simply do this:

try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }

The channel will Complete after any outcome of the Task.WhenAll(producers) task, and so the consumers will not get stuck.

A third problem is that a failure of some of the producers will cause the immediate termination of the current method, before awaiting the consumers. These tasks will then become fire-and-forget tasks. I am leaving it to you to find how you can ensure that all tasks can be awaited, in all cases, before exiting the method either successfully or with an error.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • @Houlahan not quite. There's nothing wrong with `ContinueWith` in this context. You don't care what thread is used to call `writer.Complete()` *at all*. On the other hand, using exceptions as incomplete control flow like this is a problem : it's expensive, harder to debug but worse, worker exceptions are never sent to downstream channels that never realize the producer aborted. – Panagiotis Kanavos Sep 21 '21 at 07:38