0

I want to generate an enumerable of tasks, the tasks will complete at different times.

How can I make a generator in C# that:

  • yields tasks
  • every few iterations, resolves previously yielded tasks with results that are only now known

The reason I want to do this is because I am processing a long iterable of inputs, and every so often I accumulate enough data from these inputs to send a batch API request and finalise my outputs.

Pseudocode:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var queuedInputs = Queue<Input>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        yield return waiting task for this input
        queuedInputs.Enqueue(input);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            GetFromAPI(queue).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queuedInputs.Count > 0)
                {
                    batchResult = batchResults.Dequeue();
                    historicalInput = queuedInputs.Dequeue();
                    var output = MakeOutput(historicalInput, batchResult);
                    resolve earlier input's task with this output
                }
            });
        }
    }
}
theonlygusti
  • 11,032
  • 11
  • 64
  • 119
  • 1
    The `task.ContinueWith(result => ...);` method returns a task, which is ignored in your example. Is it really your intention to launch the continuations in a [fire-and-forget](https://stackoverflow.com/questions/61316504/proper-way-to-start-and-async-fire-and-forget-call/61320933#61320933) manner? Also are you aware of the `ContinueWith` [intricacies](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) regarding the `TaskScheduler` that is used by default, when the `scheduler` parameter is not supplied? – Theodor Zoulias Jan 24 '22 at 04:43
  • `TaskCompletionSource` is the C# equivalent of creating and resolving a Promise manually. But I'm not sure what you're trying to do. – Jeremy Lakeman Jan 24 '22 at 05:04
  • @TheodorZoulias what's wrong with fire and forget? I have no clue about any intricacies of any TaskScheduler – theonlygusti Jan 24 '22 at 12:43
  • theonlygusti I have provided two links in my previous comment, that offer some explanation about why using fire-and-forget and `ContinueWith` is generally not a good idea. My understanding is that you are trying to do things with C# that are normal in JavaScript, but idiomatic and unconventional in the C# world. Whatever you are trying to do, there are probably better ways to do it, using potentially `IAsyncEnumerable`s and the System.Linq.Async package, or the TPL Dataflow library. If you want more targeted advice, you should describe the broader problem that you are trying to solve. – Theodor Zoulias Jan 24 '22 at 16:21
  • You can read [here](https://github.com/dotnet/runtime/issues/58692 "Then or ContinueWithResult extension method for Task") Microsoft's opinion about adding a `Then` method for tasks, which is (to my understanding) closer to the Javascript `then` method than the existing `ContinueWith`. – Theodor Zoulias Jan 24 '22 at 16:27

4 Answers4

1

One approach is to use the TPL Dataflow library. This library offers a variety of components named "blocks" (TransformBlock, ActionBlock etc), where each block is processing its input data, and then propagates the results to the next block. The blocks are linked together so that the completion of the previous block in the pipeline triggers the completion of the next block etc, until the final block which is usually an ActionBlock<T> with no output. Here is an example:

var block1 = new TransformBlock<int, string>(item =>
{
    Thread.Sleep(1000); // Simulate synchronous work
    return item.ToString();
}, new()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    EnsureOrdered = false
});

var block2 = new BatchBlock<string>(batchSize: 10);

var block3 = new ActionBlock<string[]>(async batch =>
{
    await Task.Delay(1000); // Simulate asynchronous work
}); // The default MaxDegreeOfParallelism is 1

block1.LinkTo(block2, new() { PropagateCompletion = true });
block2.LinkTo(block3, new() { PropagateCompletion = true });

// Provide some input in the pipeline
block1.Post(1);
block1.Post(2);
block1.Post(3);
block1.Post(4);
block1.Post(5);

block1.Complete(); // Mark the first block as completed
await block3.Completion; // Await the completion of the last block

The TPL Dataflow library is powerful and flexible, but is has a weak point in the propagation of exceptions. There is no built-in way to instruct the block1 to stop working, if the block3 fails. You can read more about this issue here. It might not be a serious issue, if you don't expect your blocks to fail very often.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • A custom `BatchBlock` that creates batches based on some "weight" or "size" property can be found [here](https://stackoverflow.com/questions/69459634/how-to-search-vast-code-base-for-multiple-literal-strings-efficiently/69462972#69462972) (`CreateDynamicBatchBlock` method). – Theodor Zoulias Jan 24 '22 at 19:53
1

The shape of your solution is going to be driven by the shape of your problem. There's a couple of questions I have because your problem domain seems odd:

  1. Are all your inputs known at the outset? The (synchronous) IEnumerable<Input> implies they are.
  2. Are you sure you want to wait for a batch of inputs before sending any query? What about the "remainder" if you're batching by 10 but have 55 inputs?

Assuming you do have synchronous inputs, and that you want to batch with remainders, you can just accumulate all your inputs immediately, batch them, and walk the batches, asynchronously providing outputs:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  foreach (var batchedInput in inputs.Batch(10))
  {
    var batchResults = await GetFromAPI(batchedInput);
    for (int i = 0; i != batchedInput.Count; ++i)
      yield return MakeOutput(batchedInput[i], batchResults[i]);
  }
}

public static IEnumerable<IReadOnlyList<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
{
  List<TSource>? batch = null;
  foreach (var item in source)
  {
    batch ??= new List<TSource>(capacity: size);
    batch.Add(item);
    if (batch.Count == size)
    {
      yield return batch;
      batch = null;
    }
  }

  if (batch?.Count > 0)
    yield return batch;
}

Update:

If you want to start the API calls immediately, you can move those out of the loop:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  var batchedInputs = inputs.Batch(10).ToList();
  var apiCallTasks = batchedInputs.Select(GetFromAPI).ToList();
  foreach (int i = 0; i != apiCallTasks.Count; ++i)
  {
    var batchResults = await apiCallTasks[i];
    var batchedInput = batchedInputs[i];
    for (int j = 0; j != batchedInput.Count; ++j)
      yield return MakeOutput(batchedInput[j], batchResults[j]);
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • I don't know how large the batches will be, it depends on some cumulative property of the inputs. Any remainder should just be processed at the end. – theonlygusti Jan 24 '22 at 19:31
  • @theonlygusti: You can modify the `Batch` method then to handle that logic. – Stephen Cleary Jan 24 '22 at 19:34
  • 1
    On .NET 6 the [`Chunk`](https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.chunk) LINQ operator is available. For batching based on some arbitrary property, I have posted an implementation [here](https://stackoverflow.com/questions/59152658/group-files-into-500mb-chunks/59154749#59154749) (`BatchBySize`). – Theodor Zoulias Jan 24 '22 at 19:39
  • What's the most recommended way to use `async IAsyncEnumerable` as in this solution, but pre-emptively call the `GetFromAPI`s so that after every batch there might not have to be as long a pause as is current? – theonlygusti Jan 24 '22 at 22:50
0

Assuming MyGenerator() returns List<Task<T>>, and the number of tasks is relatively small (even in the hundreds is probably fine) then you can use Task.WhenAny(), which returns the first Task that completes. Then remove that Task from the list, process the result, and move on to the next:

var tasks = MyGenerator();
while (tasks.Count > 0) {
    var t = Task.WhenAny(tasks);
    tasks.Remove(t);

    var result = await t; // this won't actually wait since the task is already done
    // Do something with result
}

There is a good discussion of this in an article by Stephen Toub, which explains in more detail, and gives alternatives if your task list is in the thousands: Processing tasks as they complete

There's also this article, but I think Stephen's is better written: Process asynchronous tasks as they complete (C#)

Gabriel Luci
  • 38,328
  • 4
  • 55
  • 84
  • Implementing `MyGenerator()` is the challenging part to me – theonlygusti Jan 24 '22 at 14:46
  • Maybe that second article will help then. In that example, it takes a list of URLs and creates a list of tasks that downloads from each URL. – Gabriel Luci Jan 24 '22 at 14:51
  • But I have no trouble with how to deal with a list of tasks. I am struggling to figure out how to build the list of tasks in the first place – theonlygusti Jan 24 '22 at 15:03
  • Your question doesn't have enough information to help you with that. What is each task actually doing? – Gabriel Luci Jan 24 '22 at 15:22
  • *"which returns the first Task that completes."* -- Gabriel to be precise it returns the left-most task in the `tasks` list that is completed. If you are doing anything non-trivial inside the `while` loop, it's possible for the completed tasks to start accumulating, and then you'll observe the left-side bias. – Theodor Zoulias Jan 24 '22 at 16:35
-1

Using TaskCompletionSource:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var tcss = new List<TaskCompletionSource<Output>>();
    var queue = new Queue<(Input, TaskCompletionSource<Output>)>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        var tcs = new TaskCompletionSource<Output>();
        queue.Enqueue((input, tcs));
        tcss.Add(tcs);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            var queueClone = Queue<(Input, TaskCompletionSource<Input>)>(queue);
            queue.Clear();
            GetFromAPI(queueClone.Select(x => x.Item1)).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queueClone.Count > 0)
                {
                    var batchResult = batchResults.Dequeue();
                    var (queuedInput, queuedTcs) = queueClone.Dequeue();
                    var output = MakeOutput(queuedInput, batchResult);
                    queuedTcs.SetResult(output)
                }
            });
        }
    }
    GetFromAPI(queue.Select(x => x.Item1)).ContinueWith((apiTask) => {
        Queue<BatchResult> batchResults = apiTask.Result;
        while (queue.Count > 0)
        {
            var batchResult = batchResults.Dequeue();
            var (queuedInput, queuedTcs) = queue.Dequeue();
            var output = MakeOutput(queuedInput, batchResult);
            queuedTcs.SetResult(output)
        }
    });
    foreach (var tcs in tcss)
    {
        yield return tcs.Task;
    }
}
theonlygusti
  • 11,032
  • 11
  • 64
  • 119
  • 1
    You should use `await` instead of `ContinueWith`. When using a `TaskCompletionSource`, strongly consider `RunContinuationsAsynchronously`. The use of queues is a bit odd; I believe these could just as easily be lists (with pre-allocated capacity). There's definitely logic flows (e.g., error situations) where TCS's will never be completed. And the whole logic ends up calling the API k times before the first item is yielded, so it's way more complex than necessary - just batch the inputs, call the API that many times, and yield return the resulting tasks. – Stephen Cleary Jan 26 '22 at 03:00
  • @StephenCleary doesn't there seem to be an advantage that calling the API k times before first yield means that the calling code doesn't have to wait as long in-between batches? – theonlygusti Jan 27 '22 at 00:22
  • Yes. What I mean by "more complex than necessary" is that you can just call the API k times right after batching, and then foreach through the resulting tasks, awaiting each one. I'll update my answer with the code. – Stephen Cleary Jan 27 '22 at 14:07