2

I am trying to achieve a supplier and consumer pattern where I am calling 4 simultaneous external APIs in parallel and adding the results to a non-blocking collection and running a CPU-intensive task which watches results in the non-blocking collection, does a calculation as each result comes in and moves the results to another collection. Pseudo-code:

Task task1 = CallApiAndMoveToCollectionResult1();
Task task2 = CallApiAndMoveToCollectionResult1();
Task task3 = CallApiAndMoveToCollectionResult1();
Task task4 = CallApiAndMoveToCollectionResult1();
await Task.WhenAll(task1,task2,task3,task4);

WatchCollectionAndCalculateBatches();

How can I run synchronous WatchCollectionAndCalculateBatches() at the same time as await Task.WhenAll(task1,task2,task3,task4) which is asynchronous?

FailedUnitTest
  • 1,637
  • 3
  • 20
  • 43
  • 1
    I wonder if MS _TPL DataFlow_ might be better suited for your task? Good luck –  Sep 10 '18 at 02:31
  • 4
    Just remove the line with `await` or move it below `WatchCollectionAndCalculateBatches()`. –  Sep 10 '18 at 03:26
  • What do those methods do? Does `WatchCollectionAndCalculateBatches` check the *results* of the async methods? Why run it synchronously then? It could be a timed operation that checks the results, or a async loop. You could use a ConcurrentQueue to gather results. Or you could use `Progress` from each task to report progress – Panagiotis Kanavos Sep 10 '18 at 12:05
  • Also check MickyD's suggestion. If you call the same method in parallel, you could use an `ActionBlock` that calls your method to process each incoming message with a DOP =4. It all depends on what you're actually doing. – Panagiotis Kanavos Sep 10 '18 at 12:10
  • @Panagiotis Kanavos yes WatchCollectionAndCalculateBatches checks the results of the async methods. It essentially has an infinite while loop that checks criteria and moves the result elsewhere, it only stops when the non-blocking collection has CompleteAdding() called. – FailedUnitTest Sep 10 '18 at 13:59

1 Answers1

0

You can offload the synchronous WatchCollectionAndCalculateBatches to the ThreadPool, using the Task.Run method:

Task task1 = CallApiAndMoveToCollectionResult1();
Task task2 = CallApiAndMoveToCollectionResult1();
Task task3 = CallApiAndMoveToCollectionResult1();
Task task4 = CallApiAndMoveToCollectionResult1();
Task task5 = Task.Run(() => WatchCollectionAndCalculateBatches());
await Task.WhenAll(task1, task2, task3, task4, task5);

...or you can run it synchronously on the current thread, using the Task constructor and the RunSynchronously method:

Task task1 = CallApiAndMoveToCollectionResult1();
Task task2 = CallApiAndMoveToCollectionResult1();
Task task3 = CallApiAndMoveToCollectionResult1();
Task task4 = CallApiAndMoveToCollectionResult1();
Task task5 = new Task(() => WatchCollectionAndCalculateBatches());
task5.RunSynchronously(TaskScheduler.Default);
await Task.WhenAll(task1, task2, task3, task4, task5);

Creating and awaiting all 5 tasks at once ensures that in case any of the task fails, the rest will not become fire-and-forget. But be careful: in case the completion of a task depends on another task, then awaiting both of them with Task.WhenAll might result in a deadlock. This will happen if the second task fails, in which case the first task will never complete, so the Task.WhenAll will never complete either.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104