@supertopi's answer is correct.
For fun, I thought I would make an extension method ToOrdinalOrder
that enforces output order, which can then be stitched together with SelectMany
to get a version of SelectMany
where output order matches input order.
public static class Extensions
{
public static IObservable<T> ToOrdinalOrder<T>(this IObservable<T> source, Func<T, int> indexSelector)
{
return source
.Scan((expectedIndex: 0, heldItems: ImmutableDictionary<int, T>.Empty, toReturn: Observable.Empty<T>()), (state, item) =>
{
var itemIndex = indexSelector(item);
if (itemIndex == state.expectedIndex)
{
var toReturn = Observable.Return(item);
var expectedIndex = itemIndex + 1;
var heldItems = state.heldItems;
while (heldItems.ContainsKey(expectedIndex))
{
toReturn = toReturn.Concat(Observable.Return(heldItems[expectedIndex]));
heldItems = heldItems.Remove(expectedIndex);
expectedIndex++;
}
return (expectedIndex, heldItems, toReturn);
}
else
return (state.expectedIndex, state.heldItems.Add(itemIndex, item), Observable.Empty<T>());
})
.SelectMany(t => t.toReturn);
}
public static IObservable<TResult> OrderedSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
{
return source
.SelectMany(async (TSource item, int index) => (await selector(item), index))
.ToOrdinalOrder(t => t.Item2)
.Select(t => t.Item1);
}
}
Here's some sample code to demonstrate usage:
var source = new Subject<string>();
source
.Select((item, index) => (item, index)) //index only required for logging purposes.
.OrderedSelectMany(async t =>
{
var item = t.Item1;
var index = t.Item2;
Console.WriteLine($"Starting item {item}, index {index}.");
switch (index)
{
case 0:
await Task.Delay(TimeSpan.FromMilliseconds(50));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 1:
await Task.Delay(TimeSpan.FromMilliseconds(200));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 2:
await Task.Delay(TimeSpan.FromMilliseconds(20));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
default:
throw new NotImplementedException();
}
})
.Subscribe(s => Console.WriteLine($"Received item '{s}'."));
source.OnNext("A");
source.OnNext("B");
source.OnNext("C");
source.OnCompleted();
Output looks like this:
Starting item A, index 0.
Starting item B, index 1.
Starting item C, index 2.
Completing item C, index 2.
Completing item A, index 0.
Received item '(A, 0)'.
Completing item B, index 1.
Received item '(B, 1)'.
Received item '(C, 2)'.