3

In the article Deep Dive into Rx SelectMany the author mentions the following in a note right at the end;

Note: To mitigate the ordering issue, SelectMany() comes with an overload which takes a selector with signature Func<TSource, int, Task<TResult>>.

Could anyone enlighten me as to how this might work?

Uwe Keim
  • 39,551
  • 56
  • 175
  • 291
inthegarden
  • 267
  • 1
  • 11
  • Related: [How to merge multiple observables with order preservation and maximum concurrency?](https://stackoverflow.com/questions/64841312/how-to-merge-multiple-observables-with-order-preservation-and-maximum-concurrenc) (`SelectMany` = `Select` + `Merge`) – Theodor Zoulias Jan 05 '21 at 06:25

2 Answers2

4

@supertopi's answer is correct.

For fun, I thought I would make an extension method ToOrdinalOrder that enforces output order, which can then be stitched together with SelectMany to get a version of SelectMany where output order matches input order.

public static class Extensions
{
    public static IObservable<T> ToOrdinalOrder<T>(this IObservable<T> source, Func<T, int> indexSelector)
    {
        return source
            .Scan((expectedIndex: 0, heldItems: ImmutableDictionary<int, T>.Empty, toReturn: Observable.Empty<T>()), (state, item) =>
            {
                var itemIndex = indexSelector(item);
                if (itemIndex == state.expectedIndex)
                {
                    var toReturn = Observable.Return(item);
                    var expectedIndex = itemIndex + 1;
                    var heldItems = state.heldItems;
                    while (heldItems.ContainsKey(expectedIndex))
                    {
                        toReturn = toReturn.Concat(Observable.Return(heldItems[expectedIndex]));
                        heldItems = heldItems.Remove(expectedIndex);
                        expectedIndex++;
                    }
                    return (expectedIndex, heldItems, toReturn);
                }
                else
                    return (state.expectedIndex, state.heldItems.Add(itemIndex, item), Observable.Empty<T>());
            })
            .SelectMany(t => t.toReturn);
    }

    public static IObservable<TResult> OrderedSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        return source
            .SelectMany(async (TSource item, int index) => (await selector(item), index))
            .ToOrdinalOrder(t => t.Item2)
            .Select(t => t.Item1);
    }
}

Here's some sample code to demonstrate usage:

var source = new Subject<string>();

source
    .Select((item, index) => (item, index)) //index only required for logging purposes. 
    .OrderedSelectMany(async t =>
    {
        var item = t.Item1;
        var index = t.Item2;
        Console.WriteLine($"Starting item {item}, index {index}.");
        switch (index)
        {
            case 0:
                await Task.Delay(TimeSpan.FromMilliseconds(50));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            case 1:
                await Task.Delay(TimeSpan.FromMilliseconds(200));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            case 2:
                await Task.Delay(TimeSpan.FromMilliseconds(20));
                Console.WriteLine($"Completing item {item}, index {index}.");
                return (item, index);
            default:
                throw new NotImplementedException();
        }
    })
    .Subscribe(s => Console.WriteLine($"Received item '{s}'."));

source.OnNext("A");
source.OnNext("B");
source.OnNext("C");
source.OnCompleted();

Output looks like this:

Starting item A, index 0.
Starting item B, index 1.
Starting item C, index 2.
Completing item C, index 2.
Completing item A, index 0.
Received item '(A, 0)'.
Completing item B, index 1.
Received item '(B, 1)'.
Received item '(C, 2)'.
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • ValueTuples are great. Couldn't you also write `.ToOrdinalOrder(t => t.index).Select(t => t.item);` in `OrderedSelectMany` ? – supertopi Jun 21 '17 at 19:56
  • C# 7.0 is what I'm using now. Using that, I would have to name the tuple members explicitly in the previous line. In 7.1 what you describe may be possible. – Shlomo Jun 21 '17 at 21:27
  • 1
    @Shlomo appreciate you taking the time out to knock up some code for me, thanks very much! – inthegarden Jun 22 '17 at 15:41
2

In the metadata description of the said overload it says

//   selector:
//     A transform function to apply to each element; the second parameter of the function
//     represents the index of the source element.

Within the selector function you have access to the value and the original index of the notification from the source.

e.g. if you need to do some work with bunch of values and know when the work for specific source value has completed.

public static IObservable<int> WorkAndReportIndex<TSource>(this IObservable<TSource> source)
{
    Func<TSource, int, Task<int>> selector = async (value, index) =>
    {
        await SomeWork(value); 
        return index;
    };

    return source.SelectMany(selector);
}
supertopi
  • 3,469
  • 26
  • 38