4

I have a few AsyncEnumerable<string>s that I would like to merge in a single AsyncEnumerable<string>, which should contain all the elements that are emitted concurrently from those sequences. So I used the Merge operator from the System.Interactive.Async package. The problem is that this operator does not always treat all sequences as equal. In some circumstances it prefers emitting elements from the sequences that are on the left side of the arguments list, and neglects the sequences that are on the right side in the arguments list. Here is a minimal example that reproduces this undesirable behavior:

var sequence_A = Enumerable.Range(1, 5).Select(i => $"A{i}").ToAsyncEnumerable();
var sequence_B = Enumerable.Range(1, 5).Select(i => $"B{i}").ToAsyncEnumerable();
var sequence_C = Enumerable.Range(1, 5).Select(i => $"C{i}").ToAsyncEnumerable();
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged) Console.WriteLine(item);

This code snippet has also a dependency on the System.Linq.Async package. The sequence_A emits 5 elements starting from "A", the sequence_B emits 5 elements starting from "B", and the sequence_C emits 5 elements starting from "C".

Output (undesirable):

A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
C1
C2
C3
C4
C5

Try it on Fiddle.

The desirable output should look like this:

A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4
A5
B5
C5

In case all sequences have their next element available, the merged sequence should pull one element from each sequence, instead of pulling elements repeatedly from the left-most sequence.

How can I ensure that my sequences are merged with fairness? I am looking for a combination of operators from the official packages that has the desirable behavior, or for a custom Merge operator that does what I want.

Clarification: I am interested about the concurrent Merge functionality, where all source sequences are observed at the same time, and any emission from any of the sequences is propagated to the merged sequence. The concept of fairness applies when more than one sequences can emit an element immediately, in which case their emissions should be interleaved. In the opposite case, when there is no element immediately available, the rule is "first to come - first to go".


Update: Here is a more realistic demo, that includes latency in the producer sequences, and in the consuming enumeration loop. It simulates a situation where consuming the values produced by the left-most sequence takes longer than the time required for producing those values.

var sequence_A = Produce("A", 200, 1, 2, 3, 4, 5);
var sequence_B = Produce("B", 150, 1, 2, 3, 4, 5);
var sequence_C = Produce("C", 100, 1, 2, 3, 4, 5);
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged)
{
    Console.WriteLine(item);
    await Task.Delay(item.StartsWith("A") ? 300 : 50); // Latency
}

async IAsyncEnumerable<string> Produce(string prefix, int delay, params int[] values)
{
    foreach (var value in values)
    {
        var delayTask = Task.Delay(delay);
        yield return $"{prefix}{value}";
        await delayTask; // Latency
    }
}

The result is an undesirable bias for the values produced by the sequence_A:

A1
A2
A3
A4
A5
B1
B2
C1
B3
C2
B4
C3
C4
B5
C5

Try it on Fiddle.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This is going to impact the asynchronicity. The benefit of doing it "left first" is that the other enumerables are not yet being enumerated. In order to load A1-B1-C1, you'd be enumerating all collections immediately (well, within the first three accessed elements). Is maintaining asynchronicity relevant for your solution? A synchronous solution seems straightforward. – Flater Jan 14 '22 at 11:56
  • @Flater yes, definitely I want the merged sequence to be asynchronous. My expectation about the `Merge` operator is that it observes concurrently all supplied sequences for new emissions. It shouldn't just enumerate each sequence independently and sequentially. That's what the `Concat` operator is for! – Theodor Zoulias Jan 14 '22 at 12:01
  • I suspect it's going to be nigh impossible to avoid breaking your asynchronicity here, since in order to account for collections of varying length, you need to know their length to avoid going out of range, which in turn requires enumeration. I'm not sure that the _tiny_ optimization of deferring the second/third enumeration to the second/third element being accessed is worth the addicitonal cost of having to constantly checking the `Count()` afterwards; as opposed to simply enumerating them into a list immediately and then very easily returning the elements in the right order. – Flater Jan 14 '22 at 12:13
  • @Flater TBH I have already coded a solution, that I intend to post later (probably tomorrow after I have double checked it) as a [self-answer](https://stackoverflow.com/help/self-answer), so it's definitely possible. :-) – Theodor Zoulias Jan 14 '22 at 12:17
  • 1
    I'm interested to see it. – Flater Jan 14 '22 at 12:20
  • The example is a bit contrived as all results are available at the same time. What if there were random delays between elements? Would it still prefer the left sequence? The method docs [explicitly say it *isn't* left-first](https://github.com/dotnet/reactive/blob/305d381dcc46c5966e7260e0959b3083846bf05f/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs#L25) – Panagiotis Kanavos Jan 14 '22 at 14:08
  • I can't reproduce the problem with live results – Panagiotis Kanavos Jan 14 '22 at 14:29
  • @PanagiotisKanavos I'll update the question with an example that incorporates latency in the producers and the consumer. It's true that if you add latency in the source sequences, and don't add latency in the enumeration of the merged sequence, the problem disappears. But in my case I have latency in the consumer as well, because I have some asynchronous work to do every time I receive a merged value. – Theodor Zoulias Jan 14 '22 at 14:50
  • @TheodorZoulias I already posted an example that contains a latency and results that show there's no preference. What you ask seems to be to interleave the streams, not merge them – Panagiotis Kanavos Jan 14 '22 at 14:51
  • @TheodorZoulias that said, System.Linq.Async isn't Go, and `Merge` isn't `select` - yet. My personal annoyance are the operators that have to consume the entire source before producing results. The `System.Linq.Async` operators are closer to LINQ than Rx, which can cause problems, especially with infinite streams – Panagiotis Kanavos Jan 14 '22 at 14:57
  • Related: [Merge multiple IAsyncEnumerable streams](https://stackoverflow.com/questions/70658393/merge-multiple-iasyncenumerable-streams). – Theodor Zoulias Jun 22 '22 at 09:19

3 Answers3

1

Here is the final code. The algorithm has been modified to suit the OP. I have left the original code below.

This use a greedy algorithm: the first available value is returned, and no attempt is made to merge in turn. Each time a task finishes, the next one for the same enumerator goes to the back, ensuring fairness.

The algorithm is as follows:

  • The function accepts a params array of sources.
  • Early bail-out if no source enumerables are provided.
  • Create a list to hold the enumerators along with their respective tasks as tuples.
  • Get each enumerator, call MoveNextAsync and store the pair in the list.
  • In a loop, call Task.WhenAny on the whole list.
  • Take the resulting Task and find its location in the list.
  • Hold the tuple in a variable and remove it from the list.
  • If it returned true, then yield the value and call MoveNextAsync again for the matching enumerator, pushing the resulting tuple to the back of the list.
  • If it returns false, then Dispose the enumerator.
  • Continue looping until the list is empty.
  • finally block disposes any remaining enumerators.
  • There is also an overload to provide a cancellation token

There are some efficiencies to be had in terms of allocations etc. I've left that as an exercise to the reader.


 public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
     Interleave(default, sources);
 
 public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
 {
     if(sources.Length == 0)
         yield break;
     var enumerators = new List<(IAsyncEnumerator<T> e, Task<bool> t)>(sources.Length);
     try
     {
         for(var i = 0; i < sources.Length; i++)
         {
             var e = sources[i].GetAsyncEnumerator(token);
             enumerators.Add((e, e.MoveNextAsync().AsTask()));
         }

         do
         {
             var taskResult = await Task.WhenAny(enumerators.Select(tuple => tuple.t));
             var ind = enumerators.FindIndex(tuple => tuple.t == taskResult);
             var tuple = enumerators[ind];
             enumerators.RemoveAt(ind);
             if(await taskResult)
             {
                 yield return tuple.e.Current;
                 enumerators.Add((tuple.e, tuple.e.MoveNextAsync().AsTask()));
             }
             else
             {
                 try
                 {
                     await tuple.e.DisposeAsync();
                 }
                 catch
                 { //
                 }
             }
         } while (enumerators.Count > 0);
     }
     finally
     {
         for(var i = 0; i < enumerators.Count; i++)
         {
             try
             {
                 await enumerators[i].e.DisposeAsync();
             }
             catch
             { //
             }
         }
     }
 }

dotnetfiddle


EDIT The below isn't quite what OP wanted, as OP wants any result to be returned, whichever first. I'll leave this here because it's a good demonstration of this algorithm.

Here is a full implementation of the async Interleave or Merge algorithm, known more commonly in SQL terms as a Merge-Concatenation.

The algorithm is as follows:

  • The function accepts a params array of sources.
  • Early bail-out if no source enumerables are provided.
  • Create a list to hold the enumerators.
  • Get each enumerator and store it in the list.
  • In a loop, take each enumerator and MoveNextAsync.
  • If it returns true, then yield the value and increment the loop counter. If it rolls over, go back to the beginning.
  • If it returns false, then Dispose it and remove from the list. Do not increment counter.
  • Continue looping until there are no more enumerators.
  • finally block disposes any remaining enumerators.
  • There is also an overload to provide a cancellation token

 public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
     Interleave(default, sources);
 
 public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
 {
     if(sources.Length == 0)
         yield break;
     var enumerators = new List<IAsyncEnumerator<T>>(sources.Length);
     try
     {
         for(var i = 0; i < sources.Length; i++)
             enumerators.Add(sources[i].GetAsyncEnumerator(token));

         var j = 0;
         do
         {
             if(await enumerators[j].MoveNextAsync())
             {
                 yield return enumerators[j].Current;
                 j++;
                 if(j >= enumerators.Count)
                     j = 0;
             }
             else
             {
                 try
                 {
                     await enumerators[j].DisposeAsync();
                 }
                 catch
                 { //
                 }
                 enumerators.RemoveAt(j);
             }
         } while (enumerators.Count > 0);
     }
     finally
     {
         for(var i = 0; i < enumerators.Count; i++)
         {
             try
             {
                 await enumerators[i].DisposeAsync();
             }
             catch
             { //
             }
         }
     }
 }

dotnetfiddle

This can obviously be significantly simplified if you only have a fixed number of source enumerables.

Charlieface
  • 52,284
  • 6
  • 19
  • 43
  • Thanks Charlieface for the answer. Your solution is close to what I want, but it's not exactly right. The requirement for the `Merge` is that an element must be emitted whenever **any** of the source sequences emits an element. For example if the `sequence_A` emits the value `"A1"` after 2 seconds and the `sequence_B` emits the value `"B1"` after 1 second, the merged sequence must emit the `"B1"` before the `"A1"`. So we can't just enumerate independently the source sequences in an interleaving manner. – Theodor Zoulias Jan 14 '22 at 14:42
  • 1
    Yeah I see it's not quite what you wanted. I'm developing another answer based on `Task.WhenAny` but it's got some bugs and don't have time now, might post tomorrow or Sunday – Charlieface Jan 14 '22 at 15:58
  • @TheodorZoulias OK have modified it now, I think it does what you want. I couldn't find an easy way to do `ValueTask.WhenAny` so I've just used `AsTask`, which I admit is slightly less efficient – Charlieface Jan 15 '22 at 20:34
  • Hi Charlieface! Thanks for the update. The new algorithm ([revision 3](https://stackoverflow.com/revisions/70711683/3)) seems to have the correct behavior regarding the concurrency of the merging, but not the desirable behavior regarding the fairness. The left-side bias is still here! – Theodor Zoulias Jan 15 '22 at 21:39
  • I don't see that. In [this fiddle](https://dotnetfiddle.net/FFmgjn) with exactly the same delay for all then I see a pretty random result, different every time. You will always see a slight bias simply because one request is fired off first, however that is unlikely the dominant bias – Charlieface Jan 15 '22 at 21:46
  • Try commenting the `await Task.Delay(50);` inside the `Range` method, and [you'll see the bias](https://dotnetfiddle.net/iR8Tal)! – Theodor Zoulias Jan 15 '22 at 22:09
  • ...or add latency in the consuming `await foreach` loop, and [you'll see the bias again](https://dotnetfiddle.net/uhU7Yh). The left-side bias appears only when more than one sequences have immediately available elements. So if you consume aggressively the merged sequence in a tight loop, it's difficult to see it. – Theodor Zoulias Jan 15 '22 at 22:20
  • I personally don't believe this is likely to be an issue in a real async situation, but what you can do is push the new task to the back of the list. See modifications https://stackoverflow.com/revisions/70711683/4 – Charlieface Jan 16 '22 at 01:26
  • Now it's working as expected, thanks! TBH I am not so sure that the biased behavior of the [`AsyncEnumerableEx.Merge`](https://github.com/dotnet/reactive/blob/main/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs) is not an issue. It's more likely that too few people actually use the asynchronous sequences to their full potential, and that's why the shortcomings of the Ix.NET library are not surfaced more frequently. – Theodor Zoulias Jan 16 '22 at 01:48
  • Not sure why I called this Merge Concatenation, that is a different algorithm to do with merging in a defined order (using a comparer on the values). Also there was a missing `await` in the original version at the top, the use of `Result` would prevent exceptions from unwrapping properly. – Charlieface Mar 26 '23 at 06:16
  • It's probably called `Merge` because that's how is called the [homologous operation](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229099(v=vs.103)) for observable sequences. And my understanding is that the Rx (Reactive Extensions) was designed before the Ix (Interactive Extensions). – Theodor Zoulias Mar 26 '23 at 06:23
0

I am posting one more answer, because I noticed some other minor defects in the current¹ AsyncEnumerableEx.Merge implementation that I would like to fix:

  1. Left-side bias. This is the main issue of this question, and has already been addressed sufficiently in Charlieface's answer. In this answer I am using the same interleaving technique.
  2. Desctructive merging. In case one of the source sequences fails, the merged sequence is likely to complete without propagating all the values that are produced by the other source sequences. This makes the current AsyncEnumerableEx.Merge implementation not very suitable for producer-consumer scenarios, where processing all the consumed elements is mandatory.
  3. Delayed completion. In case one of the sequences fails, or the enumeration of the merged sequence is abandoned, the disposal of the merged enumerator might take a lot of time, because the pending MoveNextAsync operations of the source enumerators are not canceled.
  4. Throwing on dispose. It is generally recommended that disposable resources should avoid throwing on Dispose or on DisposeAsync. Nevertheless the AsyncEnumerableEx.Merge implementation propagates normal operational errors (errors thrown by the MoveNextAsync) from the finally block.

The MergeEx implementation below is an attempt to fix these problems. It is a concurrent and non-destructive implementation, that propagates all the consumed values. All the errors that are caught are preserved, and are propagated in an AggregateException.

/// <summary>
/// Merges elements from all source sequences, into a single interleaved sequence.
/// </summary>
public static IAsyncEnumerable<TSource> MergeEx<TSource>(
    params IAsyncEnumerable<TSource>[] sources)
{
    ArgumentNullException.ThrowIfNull(sources);
    sources = sources.ToArray(); // Defensive copy.
    if (sources.Any(s => s is null)) throw new ArgumentException(
        $"The {nameof(sources)} argument included a null value.", nameof(sources));
    return Implementation();

    async IAsyncEnumerable<TSource> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        if (sources.Length == 0) yield break;
        cancellationToken.ThrowIfCancellationRequested();
        using var linkedCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        List<(IAsyncEnumerator<TSource>, Task<bool> MoveNext)> state = new();
        List<Exception> errors = new();
        void CancelCts()
        {
           try { linkedCts.Cancel(); }
           catch (AggregateException aex) { errors.AddRange(aex.InnerExceptions); }
        }

        try
        {
            // Create enumerators and initial MoveNextAsync tasks.
            foreach (var source in sources)
            {
                IAsyncEnumerator<TSource> enumerator;
                Task<bool> moveNext;
                try { enumerator = source.GetAsyncEnumerator(linkedCts.Token); }
                catch (Exception ex) { errors.Add(ex); break; }
                try { moveNext = enumerator.MoveNextAsync().AsTask(); }
                catch (Exception ex) { moveNext = Task.FromException<bool>(ex); }
                state.Add((enumerator, moveNext));
            }

            bool cancellationOccurred = false;

            // Loop until all enumerators are completed.
            while (state.Count > 0)
            {
                int completedIndex = -1;
                for (int i = 0; i < state.Count; i++)
                {
                    var status = state[i].MoveNext.Status;
                    if (status == TaskStatus.Faulted || status == TaskStatus.Canceled)
                    {
                        // Handle errors with priority.
                        completedIndex = i;
                        break;
                    }
                    else if (status == TaskStatus.RanToCompletion)
                    {
                        // Handle completion in order.
                        if (completedIndex == -1) completedIndex = i;
                        continue;
                    }
                }

                if (completedIndex == -1)
                {
                    // All MoveNextAsync tasks are currently in-flight.
                    await Task.WhenAny(state.Select(e => e.MoveNext))
                        .ConfigureAwait(false);
                    continue;
                }

                var (enumerator, moveNext) = state[completedIndex];
                Debug.Assert(moveNext.IsCompleted);
                (TSource Value, bool HasValue) item;
                try
                {
                    bool moved = await moveNext.ConfigureAwait(false);
                    item = moved ? (enumerator.Current, true) : default;
                }
                catch (OperationCanceledException)
                    when (linkedCts.IsCancellationRequested)
                {
                    // Cancellation from the linked token source is not an error.
                    item = default; cancellationOccurred = true;
                }
                catch (Exception ex) { errors.Add(ex); CancelCts(); item = default; }

                if (item.HasValue)
                    yield return item.Value;

                if (item.HasValue && errors.Count == 0)
                {
                    try { moveNext = enumerator.MoveNextAsync().AsTask(); }
                    catch (Exception ex) { moveNext = Task.FromException<bool>(ex); }
                    // Deprioritize the selected enumerator.
                    state.RemoveAt(completedIndex);
                    state.Add((enumerator, moveNext));
                }
                else
                {
                    // The selected enumerator has completed or an error has occurred.
                    state.RemoveAt(completedIndex);
                    try { await enumerator.DisposeAsync().ConfigureAwait(false); }
                    catch (Exception ex) { errors.Add(ex); CancelCts(); }
                }
            }

            if (errors.Count > 0)
                throw new AggregateException(errors);

            // Propagate cancellation only if it occurred during the loop.
            if (cancellationOccurred)
                cancellationToken.ThrowIfCancellationRequested();
        }
        finally
        {
            errors.Clear();
            // Cancel the enumerators, for more responsive completion.
            // Propagate only DisposeAsync errors.
            // Surface any other error through the
            // TaskScheduler.UnobservedTaskException event.
            try { linkedCts.Cancel(); }
            catch (Exception ex) { _ = Task.FromException(ex); }
            foreach (var (enumerator, moveNext) in state)
            {
                if (!moveNext.IsCompleted)
                {
                    // The last moveNext must be completed before disposing.
                    await Task.WhenAny(moveNext).ConfigureAwait(false);
                }
                try { await enumerator.DisposeAsync().ConfigureAwait(false); }
                catch (Exception ex) { errors.Add(ex); }
            }
            if (errors.Count > 0)
                throw new AggregateException(errors);
        }
    }
}

¹ System.Interactive.Async version 6.0.1

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
-1

The example is a bit contrived as all results are available immediately. If even a small delay is added, the results are mixed:

var sequence_A = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"A{i}";});
var sequence_B = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"B{i}";});
var sequence_C = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"C{i}";});
var sequence_D = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"D{i}";});

await foreach (var item in seq) Console.WriteLine(item);

This produces different, mixed results each time :

B1
A1
C1
D1
D2
A2
B2
C2
D3
A3
B3
C3
C4
A4
B4
D4
D5
A5
B5
C5

The method's comments explain it was reimplemented to be cheaper and fairer:

//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
//   - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
//   - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
//   - the MoveNextAsync tasks are awaited concurently, but completions are queued,
//     instead of awaiting a new WhenAny task where "left" sources have preferential
//     treatment over "right" sources.
//
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Panagiotis this post doesn't answer the question. Could you consider deleting it please? – Theodor Zoulias Jan 17 '22 at 09:34
  • Btw the [comment](https://github.com/dotnet/reactive/blob/305d381dcc46c5966e7260e0959b3083846bf05f/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs#L25) that you quoted about a fairer and cheaper implementation assumes that the System.Interactive.Async project is compiled with the `USE_FAIR_AND_CHEAPER_MERGE` symbol, which is not guaranteed. In fact according to my experiments it's unlikely that this is the case. The currently released `AsyncEnumerableEx.Merge` (5.1.0 package version) allocates ~350 bytes per iteration on the merged sequence, which is quite expensive. – Theodor Zoulias Jan 17 '22 at 13:31
  • @TheodorZoulias I thought of a far simpler solution in the meantime - have every source post to a Channel and return that channel's output. Given that Channel guarantees order, there's no way to consume more than one item from Seq_A if other streams have data – Panagiotis Kanavos Jan 17 '22 at 13:37
  • Panagiotis feel free to post this idea as an answer. If you also post an implementation of this idea, I will be able to evaluate it side by side Charlieface's [implementation](https://stackoverflow.com/a/70711683/11178549), and choose the best. Be aware that Charlieface's implementation behaves exactly as I expect and desire, so it might be hard to beat. – Theodor Zoulias Jan 17 '22 at 13:43