2

I have two asynchronous sequences that I want to "zip" in pairs, and for this purpose I used the Zip operator from the System.Linq.Async package. This operator behaves in an undesirable way though, at least for my case. Instead of enumerating the two sequences concurrently, it enumerates them sequentially, with the result being that the latency is added up. Each of my sequences emits an element every one second on average, and I expected that the combined sequence would also emit zipped pairs every one second, but in reality I get one pair every 2 seconds. Below is a minimal example that demonstrates this behavior:

static async IAsyncEnumerable<int> First()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

static async IAsyncEnumerable<int> Second()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

var stopwatch = Stopwatch.StartNew();
await foreach (var pair in First().Zip(Second()))
    Console.WriteLine(pair);
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

Output:

(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
Duration: 10,155 msec

Try it on Fiddle.

Is there any way that I can Zip these two sequences in a way that the program completes in 5 seconds instead of 10? I am interested about a custom operator, or about a combination of operators from the official packages, that has the desirable behavior.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

1 Answers1

4

Something like this appears to work:

public static async IAsyncEnumerable<(TFirst, TSecond)> Zip<TFirst, TSecond>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second)
{
    await using var e1 = first.GetAsyncEnumerator();    
    await using var e2 = second.GetAsyncEnumerator();
    
    while (true)
    {
        var t1 = e1.MoveNextAsync().AsTask();
        var t2 = e2.MoveNextAsync().AsTask();
        await Task.WhenAll(t1, t2);
        
        if (!t1.Result || !t2.Result)
            yield break;
        
        yield return (e1.Current, e2.Current);
    }
}

See it on dotnetfiddle.net.

Of course, this misses things like null checks, so could do with some improvements: that's left as an excercise for the reader.

I'm also not convinced that the Task.WhenAll is any better than bool r1 = await t1; bool r2 = await t2; if (!r1 || !r2) yield break; here.

canton7
  • 37,633
  • 3
  • 64
  • 77
  • 2
    I would do the "i'm not convinced" part. It will be just as efficient, if not more. However, make sure that you await both of the `ValueTask` even if one of them throws. – Cory Nelson Jan 05 '22 at 17:19
  • 2
    Also for a slight performance boost I would avoid GC by reusing the same array on each loop – Charlieface Jan 06 '22 at 01:21
  • I was actually using ValueTask unsafely - edited to fix. I think re-implementing `Task.WhenAll` for ValueTask is complex enough that it's out of scope here -- that's a separate, harder problem. – canton7 Jan 06 '22 at 08:50
  • 2
    @canton7 it has been asked here: [Task.WhenAll for ValueTask](https://stackoverflow.com/questions/45689327/task-whenall-for-valuetask) – Theodor Zoulias Jan 06 '22 at 09:34
  • @TheodorZoulias, but, with a `ValueTask` it would be incorrect to await and then access the result in separate statements? Which would is a problem when `TFirst` and `TSecond` are different types. – Jodrell Sep 13 '22 at 15:54
  • @Jodrell yes, it is incorrect to `await` and then access the result of a `ValueTask`. Also in this specific case both `ValueTask`s have the same generic type `T`. Related question for tasks: [Awaiting multiple Tasks with different results](https://stackoverflow.com/questions/17197699/awaiting-multiple-tasks-with-different-results). I am not aware if a similar question has been asked for value-tasks. – Theodor Zoulias Sep 13 '22 at 19:06