I have a few AsyncEnumerable<string>
s that I would like to merge in a single AsyncEnumerable<string>
, which should contain all the elements that are emitted concurrently from those sequences. So I used the Merge
operator from the System.Interactive.Async package. The problem is that this operator does not always treat all sequences as equal. In some circumstances it prefers emitting elements from the sequences that are on the left side of the arguments list, and neglects the sequences that are on the right side in the arguments list. Here is a minimal example that reproduces this undesirable behavior:
var sequence_A = Enumerable.Range(1, 5).Select(i => $"A{i}").ToAsyncEnumerable();
var sequence_B = Enumerable.Range(1, 5).Select(i => $"B{i}").ToAsyncEnumerable();
var sequence_C = Enumerable.Range(1, 5).Select(i => $"C{i}").ToAsyncEnumerable();
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged) Console.WriteLine(item);
This code snippet has also a dependency on the System.Linq.Async package. The sequence_A
emits 5 elements starting from "A"
, the sequence_B
emits 5 elements starting from "B"
, and the sequence_C
emits 5 elements starting from "C"
.
Output (undesirable):
A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
C1
C2
C3
C4
C5
The desirable output should look like this:
A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4
A5
B5
C5
In case all sequences have their next element available, the merged sequence should pull one element from each sequence, instead of pulling elements repeatedly from the left-most sequence.
How can I ensure that my sequences are merged with fairness? I am looking for a combination of operators from the official packages that has the desirable behavior, or for a custom Merge
operator that does what I want.
Clarification: I am interested about the concurrent Merge
functionality, where all source sequences are observed at the same time, and any emission from any of the sequences is propagated to the merged sequence. The concept of fairness applies when more than one sequences can emit an element immediately, in which case their emissions should be interleaved. In the opposite case, when there is no element immediately available, the rule is "first to come - first to go".
Update: Here is a more realistic demo, that includes latency in the producer sequences, and in the consuming enumeration loop. It simulates a situation where consuming the values produced by the left-most sequence takes longer than the time required for producing those values.
var sequence_A = Produce("A", 200, 1, 2, 3, 4, 5);
var sequence_B = Produce("B", 150, 1, 2, 3, 4, 5);
var sequence_C = Produce("C", 100, 1, 2, 3, 4, 5);
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged)
{
Console.WriteLine(item);
await Task.Delay(item.StartsWith("A") ? 300 : 50); // Latency
}
async IAsyncEnumerable<string> Produce(string prefix, int delay, params int[] values)
{
foreach (var value in values)
{
var delayTask = Task.Delay(delay);
yield return $"{prefix}{value}";
await delayTask; // Latency
}
}
The result is an undesirable bias for the values produced by the sequence_A
:
A1
A2
A3
A4
A5
B1
B2
C1
B3
C2
B4
C3
C4
B5
C5