7

Is there a way to unwrap the IObservable<Task<T>> into IObservable<T> keeping the same order of events, like this?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->

Let's say I have a desktop application that consumes a stream of messages, some of which require heavy post-processing:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks

I imagine two ways of dealing with that.

First, I can subscribe to streamOfTasks using the asynchronous event handler:

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});

Second, I can convert streamOfTasks using Observable.Create, like this:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));

Either way, the order of messages is not preserved: some later messages that don't need any post-processing come out faster than earlier messages that require post-processing. Both my solutions handle the incoming messages in parallel, but I'd like them to be processed sequentially, one by one.

I can write a simple task queue to process just one task at a time, but perhaps it's an overkill. Seems to me that I'm missing something obvious.


UPD. I wrote a sample console program to demonstrate my approaches. All solutions by far don't preserve the original order of events. Here is the output of the program:

Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0

Here is the complete source code:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
yallie
  • 2,200
  • 1
  • 28
  • 26
  • 3
    Why are people voting to close this without even leaving a comment? – satnhak Apr 10 '17 at 03:09
  • @yallie - Can you explain what you mean by "order of the events"? The order in which the messages are created but not the tasks completed? – Enigmativity Apr 10 '17 at 22:47
  • @Enigmativity yes, exactly. I need to keep the order in which the messages arrived regardless of the time needed to process them. – yallie Apr 10 '17 at 23:01
  • I wrote an extension to do exactly that - I wanted to zip together the source with the processed results - and order was important. I'll see if I can find it. – Enigmativity Apr 11 '17 at 02:54
  • @Enigmativity that would be great! – yallie Apr 12 '17 at 00:31

5 Answers5

2

Combining @Enigmativity's simple approach with @VMAtm's idea of attaching the counter and some code snippets from this SO question, I came up with this solution:

// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));

processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));

// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....

Here is my SelectAsync extension method to transform IObservable<Task<TSource>> into IObservable<TResult> keeping the original order of events:

public static IObservable<TResult> SelectAsync<TSource, TResult>(
    this IObservable<TSource> src,
    Func<TSource, Task<TResult>> selectorAsync)
{
    // using local variable for counter is easier than src.Scan(...)
    var counter = 0;
    var streamOfTasks =
        from source in src
        from result in Observable.FromAsync(async () => new
        {
            Index = Interlocked.Increment(ref counter) - 1,
            Result = await selectorAsync(source)
        })
        select result;

    // buffer the results coming out of order
    return Observable.Create<TResult>(observer =>
    {
        var index = 0;
        var buffer = new Dictionary<int, TResult>();

        return streamOfTasks.Subscribe(item =>
        {
            buffer.Add(item.Index, item.Result);

            TResult result;
            while (buffer.TryGetValue(index, out result))
            {
                buffer.Remove(index);
                observer.OnNext(result);
                index++;
            }
        });
    });
}

I'm not particularly satisfied with my solution as it looks too complex to me, but at least it doesn't require any external dependencies. I'm using here a simple Dictionary to buffer and reorder task results because the subscriber need not to be thread-safe (the subscriptions are neved called concurrently).

Any comments or suggestions are welcome. I'm still hoping to find the native RX way of doing this without custom buffering extension method.

Community
  • 1
  • 1
yallie
  • 2,200
  • 1
  • 28
  • 26
2

The RX library contains three operators that can unwrap an observable sequence of tasks, the Concat, Merge and Switch. All three accept a single source argument of type IObservable<Task<T>>, and return an IObservable<T>. Here are their descriptions from the documentation:

Concat

Concatenates all task results, as long as the previous task terminated successfully.

Merge

Merges results from all source tasks into a single observable sequence.

Switch

Transforms an observable sequence of tasks into an observable sequence producing values only from the most recent observable sequence. Each time a new task is received, the previous task's result is ignored.

In other words the Concat returns the results in their original order, the Merge returns the results in order of completion, and the Switch filters out any results from tasks that didn't complete before the next task was emitted. So your problem can be solved by just using the built-in Concat operator. No custom operator is needed.

var streamOfResults = streamOfTasks
    .Select(async task =>
    {
        var result1 = await task;
        var result2 = await PostprocessAsync(result1);
        return result2;
    })
    .Concat();

The tasks are already started before they are emitted by the streamOfTasks. In other words they are emerging in a "hot" state. So the fact that the Concat operator awaits them the one after the other has no consequence regarding the concurrency of the operations. It only affects the order of their results. This would be a consideration if instead of hot tasks you had cold observables, like these created by the Observable.FromAsync and Observable.Create methods, in which case the Concat would execute the operations sequentially.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
1

Is the following simple approach an answer for you?

IObservable<Result> streamOfResults =
    from msg in streamOfMessages
    from result in Observable.FromAsync(() => PostprocessAsync(msg))
    select result;
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • No, I've already tried this approach, and it doesn't work for me. It stops on the first message that requires the post-processing (i.e., on the first task that's not completed synchronously). – yallie Apr 10 '17 at 03:32
  • I copied the code to the console application, and now it doesn't just stop, so I guess that was a UI threading issue. But the code behaves the same way as both of my solutions, i.e. the proper order of events is not preserved. – yallie Apr 10 '17 at 04:11
1

To maintain the order of events you can funnel your stream into a TransformBlock from TPL Dataflow. The TransformBlock would execute your post-processing logic and will maintain the order of its output by default.

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;

namespace HandlingStreamInOrder {

    [TestFixture]
    public class ItemHandlerTests {

        [Test]
        public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
            var itemHandler = new ItemHandler();
            var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
            timerEvents.Subscribe(async x => {
                var data = (int)x;
                Console.WriteLine($"Value Produced: {x}");                
                var dataAccepted = await itemHandler.SendAsync((int)data);
                if (dataAccepted) {
                    InputItems.Add(data);
                }                
            });

            await Task.Delay(5000);
            itemHandler.Complete();
            await itemHandler.Completion;

            CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
        }

        private IList<int> InputItems {
            get;
        } = new List<int>();
    }

    public class ItemHandler {


        public ItemHandler() {            
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = DataflowBlockOptions.Unbounded,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                EnsureOrdered = true
            };
            PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);

            var output = PostProcessBlock.AsObservable().Subscribe(x => {
                Console.WriteLine($"Value Output: {x}");
                OutputValues.Add(x);
            });
        }

        public async Task<bool> SendAsync(int data) {
            return await PostProcessBlock.SendAsync(data);
        }

        public void Complete() {
            PostProcessBlock.Complete();
        }

        public Task Completion {
            get { return PostProcessBlock.Completion; }
        }

        public IList<int> OutputValues {
            get;
        } = new List<int>();

        private IPropagatorBlock<int, int> PostProcessBlock {
            get;
        }

        private async Task<int> PostProcess(int data) {
            if (data % 3 == 0) {
                await Task.Delay(TimeSpan.FromSeconds(2));
            }            
            return data;
        }
    }
}
JSteward
  • 6,833
  • 2
  • 21
  • 30
  • Thanks for the suggestion! This approach works fine indeed. I've never used TPL DataFlow, so it took some time to digest what's happening here. I'm surprised that a seemingly basic task would require one more dependency apart from RX. – yallie Apr 10 '17 at 22:09
  • `BoundedCapacity` is equal to `DataflowBlockOptions.Unbounded` by default, as do the `EnsureOrdered` equal to `true` by default. Also there is no `EnsureOrdered` in the last version of the `Microsoft.Tpl.Dataflow` – VMAtm Apr 10 '17 at 22:10
1

Rx and TPL can be easily combined here, and TPL do save the order of events, by default, so your code could be something like this:

using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static async Task<long> PostprocessAsync(long x) { ... }

IObservable<Message> streamOfMessages = ...;
var streamOfTasks = new TransformBlock<long, long>(async msg => 
    await PostprocessAsync(msg)
    // set the concurrency level for messages to handle
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
// easily convert block into observable
IObservable<long> streamOfResults = streamOfTasks.AsObservable();

Edit: Rx extensions meant to be a reactive pipeline of events for UI. As this type of applications are in general single-threaded, so messages are being handled with saving the order. But in general events in C# aren't thread safe, so you have to provide some additional logic to same the order.

If you don't like the idea to introduce another dependency, you need to store the operation number with Interlocked class, something like this:

// counter for operations get started
int operationNumber = 0;
// counter for operations get done
int doneNumber = 0;
...
var currentOperationNumber = Interlocked.Increment(ref operationNumber);
...
while (Interlocked.CompareExchange(ref doneNumber, currentOperationNumber + 1, currentOperationNumber) != currentOperationNumber)
{
    // spin once here
}
// handle event
Interlocked.Increment(ref doneNumber);
VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • Thanks a lot! I'm actually very surprised that it can't be done using the tools provided by RX itself. I'm wondering if preserving the order of events is something non-idiomatic for RX? – yallie Apr 10 '17 at 22:18
  • @yallie updated the answer. Events in `C#` aren't thread-safe by default, as they are represented by a list of delegates, so you need some additional synchronization logic. – VMAtm Apr 11 '17 at 13:27
  • I don't quite get what's the connection between the event thread safety and the order of events produced by the observable sequence. John Skeet's article is about race conditions that can take place around the subscription and invocation. In my case, there are no race conditions because all subscriptions are done at the start of the program. I like the idea of counting the source events to match the source events (tasks) with their post-processed results, but I'm not sure if there's a RX primitive for doing such a match. – yallie Apr 11 '17 at 16:25
  • The article is about `thread-safe` too, for events. Subscriptions with events lead to race condition, as the **handling** doing for all events in one time, and during joining the results into one line the race condition occurs. – VMAtm Apr 11 '17 at 16:27
  • 1
    Not quite right. Multicast delegates are invoked sequentially, one by one, on the thread that invoked the delegate. If an event handler throws an exception, the whole invocation chain is aborted. So there is no race condition during the invocation. Here's the demo: https://gist.github.com/yallie/fd1105ba98919006d3e93d956f705ef8 – yallie Apr 11 '17 at 17:34
  • I not talking about the events, but about the Rx – VMAtm Apr 11 '17 at 17:46
  • @yallie I think that my English isn't enough to explain what I meant to. I just tried to explain the `Rx` design comparing to `C#` events, that's it. As far as I can see, `Rx` do process the events differently comparing with `C#` native events. – VMAtm Apr 11 '17 at 17:52
  • My bad, sorry, I didn't understand you. The linked article was about plain C# events, so I thought you were talking about them. – yallie Apr 11 '17 at 19:29