1

I've created two lists of tasks, and I'd like to start processing both lists concurrently.

Although the total number of tasks is relatively small (perhaps less that 10 in total), processing time for each task varies, and I can start two tasks simultaneously as they target a different API.

To differentiate the API I've created two lists, and have populated each task list based on the API that will be utilized.

Within each list of task, I'd like their tasks processed sequentially; however, the processing order in list is not important.

At any given instance, two tasks could be running concurrently as long as the two tasks are not members of same task list.

My first thought was to build along these lines:

var lgoTasks = new List<Task>();
foreach (var x in lgo)
{
  var t = sClient.SubmitToMarket(x.Id);
  lgoTasks.Add(t);
}

var opApiTasks = new List<Task>();
foreach (var x in opApi)
{
  var t = sClient.SubmitToMarket(x.Id);
  opApiTasks.Add(t);
}

await Task.WhenAll(lgoTasks, opApiTasks);

But it fails for two (or more?) reason

  1. WhenAll() won't accept two (or more) Task Arrays
  2. Even if WhenAll() accepted two Task Arrays, the processing isn't sequential

Next I tried this approach:

var lgoCount = lgo.Count();
var opApiCount = opApi.Count();

var maxCounter = lgoCount > opApiCount ? lgoCount : opApiCount;
try
{
  for (int i = 0; i < maxCounter; i++)
  {
    if (lgoCount - 1 >= i && opApiCount - 1 >= i)
    {
      var x = sClient.SubmitToMarket(lgo[i].Id);
      var y = sClient.SubmitToMarket(opApi[i].Id);

      await Task.WhenAll(x, y);
    }
    else if (lgoCount - 1 >= i)
    {
      await sClient.SubmitToMarket(Convert.ToInt32(lgo[i].Id));
    }
    else if (opApiCount - 1 >= i)
    {
      await sClient.SubmitToMarket(Convert.ToInt32(opApi[i].Id));
    }
  }

And although it works somewhat, WHENALL() creates a blocker until both its tasks are completed.

How do I concurrently start each list of tasks, making sure just one task from each list is running, and without creating blockers (like I did) using WHENALL()? Before my method returns control to the caller, all tasks in each task list are required to finish.

Thanks kindly for considering my question, and I look forward to your comments.

Bill Roberts
  • 1,127
  • 18
  • 30
  • you can use parallel.invoke to run those two loops in parallel – pm100 Mar 16 '22 at 23:23
  • A somewhat similar question, that deals with a more general problem: [Dynamically processing a concurrent collection in parallel by group but serially within each group](https://stackoverflow.com/questions/71000722/dynamically-processing-a-concurrent-collection-in-parallel-by-group-but-serially) – Theodor Zoulias Mar 17 '22 at 01:36
  • @pm100 the `Parallel.Invoke` is not async-friendly. It doesn't understand async delegates. – Theodor Zoulias Mar 17 '22 at 01:39

4 Answers4

2

Solution

You have: two list of tasks (which all btw. are started, please see below)

You need: two tasks each of which executes its 'children' sequentially.

Here's a toy example:

Console.WriteLine("Main start");
await Task.WhenAll(WorkAsync("A"), WorkAsync("B"));
Console.WriteLine("Main done.");

async Task WorkAsync(string type)
{
    for(int i = 0; i < 3; i++ )
        await WaitAndPrintAsync(type+i);
}

async Task WaitAndPrintAsync(string s)
{
    Console.WriteLine($"{s} start");
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"{s} end");
}

This results in

Main start
A0 start
B0 start
B0 end
A0 end
A1 start
B1 start
A1 end
B1 end
A2 start
B2 start
B2 end
A2 end
Main done.

Tasks are already started

When you're doing

var lgoTasks = new List<Task>();
foreach (var x in lgo)
{
  var t = sClient.SubmitToMarket(x.Id);
  lgoTasks.Add(t);
}

you are starting the tasks and just not waiting for their finish.

Here's a simple example to prove this.

Console.WriteLine("Main start");

var lgoTasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
    Task t = WaitAndPrintAsync("A" + i);
    lgoTasks.Add(t);
}

var opApiTasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
    Task t = WaitAndPrintAsync("B" + i);
    opApiTasks.Add(t);
}

Console.WriteLine("Before WhenAll");
var allTasks = lgoTasks.Concat(opApiTasks);
await Task.WhenAll(allTasks);

Console.WriteLine("Main done.");

async Task WaitAndPrintAsync(string s)
{
    Console.WriteLine($"{s} start");
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"{s} end");
}

This prints:

Main start
A0 start
A1 start
A2 start
B0 start
B1 start
B2 start
Before WhenAll
B0 end
B2 end
B1 end
A1 end
A2 end
A0 end
Main done.

Notes:

  1. All tasks are started before WhenAll is called.
  2. Since and opApiTasks are list of independent tasks I was able to just concatenate both lists and use Task WhenAll(IEnumerable<Task> tasks); to wait for them to finish.
tymtam
  • 31,798
  • 8
  • 86
  • 126
  • I've confirmed on my machine that this process works as you've coded it, and that it works modified closer to needs. But implemented in my solution "A" starts ("lgo") and finishes before "B" ("op") starts, using `await Task.WhenAll(WorkAsync(sClient, lgo), WorkAsync(sClient, op));` and here is the worker: `private async Task WorkAsync(SubmissionsClient sClient, List outputRecords) { foreach (var x in outputRecords) // execute in sequence (in different tasks) await sClient.SubmitToMarket(x.Id); }` – Bill Roberts Mar 17 '22 at 17:25
  • It could be a few things. Can you confirm that SubmitToMarket trully async (as opposed to a pretend async(pretend = declared async, but no awaits inside)? – tymtam Mar 18 '22 at 02:02
  • That crossed my mind, SubmitToMarket not being truly Async but @BitLauncher solution appears to be working, and I am too obtuse to understand why. I will dig deeper – Bill Roberts Mar 18 '22 at 14:03
1

Does SubmitToMarket start already a new task and return it? Then e. g. all lgoTasks will run concurrently! But AFAIK your description, my solution would be about (no warranty for syntax errors):

var lgoSequenceTask = new Task(async () =>
        {
            foreach (var x in lgo) // execute in sequence (in different tasks)
            {
                await sClient.SubmitToMarket(x.Id);
            }
        }
    );

var opApiSequenceTask = new Task(async () => 
        {
            foreach (var x in opApi) // execute in sequence (in different tasks)
            {
                await sClient.SubmitToMarket(x.Id);
            }
        }
    );

var bothTasks = new List<Task>() { lgoSequenceTask, opApiSequenceTask};

foreach (var task in bothTasks) // start both sequences concurrently
{
    task.Start(); 
}

await Task.WhenAll(bothTasks);

My understandig is that you want to have such a timing as an example:

        a                               b
lgo:    <-L1-><----L2---><-L3-><---L4-->|
opApi:   <--O1--><-O2-><----O3--->      |

times:
a: both tasks start in loop,
b: Task.WhenAll finished at |

Updated

The commenters wrote valuable input. I added a running solution in my async await sandbox project. A note: if SubmitToMarket does not/never change the thread inside (can happen) then some solutions won't run really concurrently, but fully sequentially, therefore I added the variable doesSubmitChangeThread and lgoBgTask/opBgTask. If SubmitToMarket always/mostly returns in another thread than it was started, my code parts with lgoBgTask/opBgTask are not needed (but doesSubmitChangeThread would have to be set true).

and ran it twice:

    private bool doesSubmitChangeThread = false;
    private async void StackOverflowQuestion71505300Async(bool awaitChecked, bool runInOwnThreadChecked)
    {
        var lgo = new string[] { "A1", "A2", "A3" };
        var opApi = new String[] { "B1", "B2", "B3" };

        Log("Starting tasks");
        // intentionally start two tasks
        var lgoBgTask = Task.Run<Task>(() => RunListInSequenceReturningTask(lgo));
        var opBgTask = Task.Run<Task>(() => RunListInSequenceReturningTask(opApi));
        
        Log("Waiting for results"); // results, that are awaitable
        var lgoSequenceTask = lgoBgTask.Result;
        var opApiSequenceTask = opBgTask.Result;

        Log("Awaiting for all tasks...");
        await Task.WhenAll(lgoSequenceTask, opApiSequenceTask);

        Log("All tasks are done.");
    }

    private Task RunListInSequenceReturningTask(string[] list)
    {
        try
        {
            return RunListInSequenceAsync(list);
        }
        catch (Exception exc)
        {
            return Task.FromException(exc);
        }
    }

    private async Task RunListInSequenceAsync(string[] list)
    {
        foreach (var x in list) // execute in sequence (changing optionally threads)
        {
            await sClient_SubmitToMarket(x);
        }
    }

    private async Task sClient_SubmitToMarket(string x)
    {
        if (doesSubmitChangeThread)
        {
            await Task.Run(() => Submit(x));
        }
        else
        {
            Submit(x);
            await Task.CompletedTask;
        }
    }

    private void Submit(string x)
    {
        Log($"{x} start");
        Thread.Sleep(100);
        Log($"{x} end");
    }

Depending on the variable there are different outputs:

doesSubmitChangeThread = false

Example 'Stackoverflow 2 lists concurrently' starting:
001 2022-03-20T15:02:30.8741675+01:00 Thread 1: Run method in UI thread
002 2022-03-20T15:02:30.8811230+01:00 Thread 1: Starting tasks
003 2022-03-20T15:02:30.8913657+01:00 Thread 1: Waiting for results
004 2022-03-20T15:02:30.9293240+01:00 Thread 5: B1 start
005 2022-03-20T15:02:30.9293240+01:00 Thread 3: A1 start
006 2022-03-20T15:02:31.0322484+01:00 Thread 3: A1 end
007 2022-03-20T15:02:31.0322484+01:00 Thread 5: B1 end
008 2022-03-20T15:02:31.0326192+01:00 Thread 3: A2 start
009 2022-03-20T15:02:31.0326192+01:00 Thread 5: B2 start
011 2022-03-20T15:02:31.1417688+01:00 Thread 3: A2 end
010 2022-03-20T15:02:31.1417688+01:00 Thread 5: B2 end
012 2022-03-20T15:02:31.1417688+01:00 Thread 3: A3 start
013 2022-03-20T15:02:31.1421069+01:00 Thread 5: B3 start
015 2022-03-20T15:02:31.2505188+01:00 Thread 3: A3 end
014 2022-03-20T15:02:31.2505188+01:00 Thread 5: B3 end
016 2022-03-20T15:02:31.2544658+01:00 Thread 1: Awaiting for all tasks...
017 2022-03-20T15:02:31.2564643+01:00 Thread 1: All tasks are done.

doesSubmitChangeThread = true

Example 'Stackoverflow 2 lists concurrently' starting:
001 2022-03-20T15:05:02.5879830+01:00 Thread 1: Run method in UI thread
002 2022-03-20T15:05:02.5929698+01:00 Thread 1: Starting tasks
003 2022-03-20T15:05:02.6019152+01:00 Thread 1: Waiting for results
004 2022-03-20T15:05:02.6480793+01:00 Thread 5: A1 start
005 2022-03-20T15:05:02.6480793+01:00 Thread 6: B1 start
006 2022-03-20T15:05:02.6593799+01:00 Thread 1: Awaiting for all tasks...
007 2022-03-20T15:05:02.7651641+01:00 Thread 6: B1 end
008 2022-03-20T15:05:02.7651641+01:00 Thread 5: A1 end
009 2022-03-20T15:05:02.7653251+01:00 Thread 3: B2 start
010 2022-03-20T15:05:02.7681511+01:00 Thread 6: A2 start
012 2022-03-20T15:05:02.8757320+01:00 Thread 3: B2 end
011 2022-03-20T15:05:02.8757320+01:00 Thread 6: A2 end
014 2022-03-20T15:05:02.8767286+01:00 Thread 5: A3 start
013 2022-03-20T15:05:02.8760494+01:00 Thread 4: B3 start
016 2022-03-20T15:05:02.9857143+01:00 Thread 5: A3 end
015 2022-03-20T15:05:02.9857143+01:00 Thread 4: B3 end
017 2022-03-20T15:05:02.9897134+01:00 Thread 1: All tasks are done.

What are your opinions about the update?

BitLauncher
  • 587
  • 4
  • 15
  • 1
    You might as well use `Task.Run` when you create the tasks and then finish with `await Task.WhenAll(lgoSequenceTask, opApiSequenceTask);`. It's a bit simpler. – Enigmativity Mar 17 '22 at 00:07
  • 1
    @Enigmativity, thanks, I just tried to keep the start of both tasks within 3 lines, IMHO this is easier to read - I wrote the code a bit like a unit test: arrange, act (start both tasks) and finally Wait instead of assert :-), and I do not know all the overloaded method signatures by heart :-). – BitLauncher Mar 17 '22 at 00:17
  • My suggestion reduces the number of lines. – Enigmativity Mar 17 '22 at 00:47
  • The `Task` constructor does not understand async delegates. You have created two [`async void`](https://docs.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming#avoid-async-void) delegates there. The task does not represent the completion of this delegate. It will complete when the first `await` is reached. An `async void` method cannot be awaited. Also starting a cold task with `Start` and without specifying the scheduler has [known problems](https://docs.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008). – Theodor Zoulias Mar 17 '22 at 01:27
  • @TheodorZoulias, this solution appears to be working. I've visited the known problems resource, but i'm unsure how it applies here, or how this solution could be updated to resolve the concern. Regardless I'll continue testing before marking this as the answer. – Bill Roberts Mar 17 '22 at 18:32
  • 1
    The `Task.Factory.StartNew` method should not be called [without configuring the scheduler](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008). It's a low-level API that [should be avoided in general](https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html). Using the `Task.Run` is preferable. – Theodor Zoulias Mar 20 '22 at 16:32
  • Great @Theodor Zoulias, I updated the code accordingly. – BitLauncher Mar 20 '22 at 20:34
  • I am still not getting why are you offloading the `RunListInSequenceReturningTask` invocation to the `ThreadPool`, and then you block (`.Result`) the current thread while the invocation is taking place? What's the benefit of doing this? The `Task.Run` [understands async delegates](https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/), and can give you back a `Task` that represents both the invocation, and the asynchronous operation. – Theodor Zoulias Mar 20 '22 at 20:55
1

This is intended as an improved version of BitLauncher's answer. Instead of creating two cold tasks with the Task constructor and starting them with the Start method, we define two asynchronous methods and we invoke them sequentially. This way we get two tasks that have already started.

async Task Run1()
{
    foreach (var x in lgo)
    {
        await sClient.SubmitToMarket(x.Id);
    }
}

async Task Run2()
{
    foreach (var x in opApi)
    {
        await sClient.SubmitToMarket(x.Id);
    }
}

Task task1 = Run1();
Task task2 = Run2();

await Task.WhenAll(task1, task2);

It's a simple but not perfect solution. The two tasks (task1 and Task2) are not coordinated, and so a failure in one task will not cause the promptly termination of the other. In case this is a problem, you could consider fixing it by adding a CancellationTokenSource in the mix, observing it in each iteration in both tasks, and canceling it in the catch block of both tasks.

There is also some code duplication, that is going to be increased if you follow the CancellationTokenSource advice. I would probably improve it quite a lot if it was for my own project, but I think that this simple form is quite good for demonstration purposes.


Update: In case the SubmitToMarket is a method with asynchronous contract but synchronous implementation, then you can ensure¹ that both loops will start working at the same time by invoking them on the ThreadPool. This can be done by wrapping them in a Task.Run like this:

Task task1 = Task.Run(() => Run1());
Task task2 = Task.Run(() => Run2());

¹ Provided that the ThreadPool is not saturated.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Right solution, if `SubmitToMarket` does change the thread (always(..mostly)) internally during its execution. Otherwise the cold tasks would be needed. – BitLauncher Mar 20 '22 at 15:54
  • @BitLauncher cold tasks are useful almost exclusively when you want to defer the execution of a `Task` until something else has happened first. This is unlikely to be the case here. – Theodor Zoulias Mar 20 '22 at 16:27
-1

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

var query1 = lgo.ToObservable().SelectMany(x => Observable.FromAsync(() => sClient.SubmitToMarket(x.Id)));
var query2 = opApi.ToObservable().SelectMany(x => Observable.FromAsync(() => sClient.SubmitToMarket(x.Id)));

await query1.Merge(query2);

Alternatively, if you don't want the beauty of Rx, then just do this for your WhenAll:

await Task.WhenAll(lgoTasks.Concat(opApiTasks));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172