2

I want to be able process a stream of events on multiple cores, but keep everything synchronized so events are processed in lock step by all subscribers, so no single subscriber ever gets ahead of any other subscriber.

In other words, I want a fast subscriber to wait until all other slow subscribers are finished with each event, before moving onto the next. Each subscriber will have a filter, so it only processes the events its interested in.

If this works, I can easily take advantage of all of the cores in my system, without running into too many multithreading or synchronization issues.

Example

Imagine we have a stream of RX events generated on a single thread. We have two RX subscribers, A and B. We have these constraints:

  • Each RX event must be processed in lock step by all subscribers, i.e. event j=2 will not be processed by subscriber B until event j=1 has been completely processed by all subscribers A and B, event j=3 will not be processed by subscriber B until event j=2 has been completely processed by all subscribers A and B, etc.
  • Parallel processing of each RX event, i.e. subscriber A can process event j=1 in parallel with subscriber B processing event j=1, etc.
  • Order invariance, i.e. all subscribers receive the events in the order that they are created, so event j=0 will always proceed j=1, event j=1 will always proceed j=2, etc. This happens automatically if events are pushed in on a single thread, so this constraint has already been met.

What I have so far

I have tried a lot of combinations of Synchronize, in conjunction with the following code:

var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
{
    // Fast Subscriber A. Takes 20 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(20));
    Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

rx.ObserveOn(ThreadPoolScheduler.Instance)              
    .Subscribe(o =>
{
    // Slow Subscriber B. Takes 500 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(500));
    Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

for (int j = 0; j < 5; j++)
{

    int j1 = j;
    rx.OnNext(j1);
    Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}

Current output of program

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.

Desired output of program

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.

Essentially, I want all subscribers to process event j=0 in parallel, then all subscribers to process event j=1 in parallel, etc, even if some of the subscribers are slower than the others. In this case, Subscriber A is fast (20 milliseconds) and Subscriber B is slow (500 milliseconds), so we need some sort of lock or gate so subscriber A waits for subscriber B to finish before moving onto the next event, or vice-versa if subscriber B is faster than subscriber A.

Of course, this is what occurs naturally in single threaded mode, but then one loses the ability for the same event to be processed in parallel by many subscribers, which means I cannot easily take advantage of all of the cores on my system.

Update

Thank you @Jonas Chapuis for your answer using Sort().

However, in this particular case, what I am aiming for is to stop fast subscribers getting ahead of slow subscribers when consuming events, i.e. I need some sort of lock or gate so that a fast subscriber will wait until all of the slow subscribers have finished with the event, before moving onto the next event.

In other words, I want all subscribers to move in lock step through the events, with no individual subscriber getting ahead of the rest. The RX event will created on a single thread, so they will never get out of order.

Update

After some months, it turns out that I was using the wrong architecture, and this was the wrong question to ask.

Instead of observing on ThreadPoolScheduler.Instance, I should be observing on an EventLoopScheduler which locks all subscriptions to a single thread. This preserves the order.

In order to get parallelism for time series data, it's better to divide the data processing into a pipeline with multiple stages, with each thread concentrating on a single pipeline stage. This is much simpler to deal with, and satisfies all of the constraints above.

Contango
  • 76,540
  • 58
  • 260
  • 305
  • Related to http://stackoverflow.com/questions/16658915/reactive-extensions-concurrency-within-the-subscriber – Contango Feb 01 '15 at 12:25

2 Answers2

2

From what I can gather, you are trying to maximize processing throughput while still retaining element order. Here is a solution based on the custom Sort() operator as described by James World in Reordering events with Reactive Extensions.

Processing elements in parallel implies losing the order. To restore the original order we are using the Sort() operator (behind the scenes, that operator mainly performs buffering and releases values according to the passed key generator function).

  var random = new Random();
  var xs = Observable.Range(0, 10);
  xs.SelectMany((index, value) => Observable.Start(() =>
  {
      Thread.Sleep(TimeSpan.FromMilliseconds(random.Next(0, 1000)));
      Console.WriteLine("Thread {0}: processing value {1}.",
          Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2), value);
      return new {Index = index, Value = value};
  }, ThreadPoolScheduler.Instance))
      .Sort(el => el.Index, 0, i => i + 1)
      .Subscribe(el => Console.WriteLine(el.Value));

This will produce an output such as below:

Thread 15: processing value 7.
Thread 10: processing value 0.
0
Thread 16: processing value 5.
Thread 11: processing value 1.
1
Thread 15: processing value 8.
Thread 14: processing value 6.
Thread 13: processing value 2.
2
Thread 10: processing value 9.
Thread 17: processing value 4.
Thread 12: processing value 3.
3
4
5
6
7
8
9
Community
  • 1
  • 1
Jonas Chapuis
  • 697
  • 5
  • 9
  • This looks absolutely brilliant, I'm trying it out now. – Contango Jan 22 '15 at 20:12
  • This answer works very well if we want to ensure that events are consumed in the correct order. However, what I am aiming for (in this particular case) is to ensure that fast subscribers to not get ahead of slow subscribers, i.e. all subscribers have finished processing each event before each subscriber moves onto the next event. I have updated the question to clarify this. – Contango Jan 24 '15 at 12:11
1

Ok I had indeed not fully understood your requirements, sorry about that. Below you'll find a different approach which relies on subscribers signalling that they are done via dedicated subjects. These subjects are then zipped together: this gives you the "lock" semantics (note that overloads of the Zip operator support up to 16 sources).

 var sw = Stopwatch.StartNew();
 var rx = new Subject<int>();
 var subscriberADone = new Subject<Unit>();
 var subscriberBDone = new Subject<Unit>();
 var bothSubscribersDone = subscriberADone.Zip(subscriberBDone, (_, __) => Unit.Default);
 var lockStepInput = rx.Zip(bothSubscribersDone.StartWith(Unit.Default), (i, _) => i);
 lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
     .Subscribe(o =>
     {
         // Fast Subscriber A. Takes 20 milliseconds.
         Thread.Sleep(TimeSpan.FromMilliseconds(20));
         Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
         subscriberADone.OnNext(Unit.Default);
     });

 lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
     .Subscribe(o =>
     {
         // Slow Subscriber B. Takes 500 milliseconds.
         Thread.Sleep(TimeSpan.FromMilliseconds(500));
         Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
         subscriberBDone.OnNext(Unit.Default);
     });

 for (int j = 0; j < 5; j++)
 {

     int j1 = j;
     rx.OnNext(j1);
     Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
 }

This generated the following output:

Push: 0 (thread 9)
Push: 1 (thread 9)
Push: 2 (thread 9)
Push: 3 (thread 9)
Push: 4 (thread 9)
Subscriber A: 0 (thread 10). Time: 111 milliseconds.
Subscriber B: 0 (thread 11). Time: 591 milliseconds.
Subscriber A: 1 (thread 10). Time: 611 milliseconds.
Subscriber B: 1 (thread 11). Time: 1091 milliseconds.
Subscriber A: 2 (thread 10). Time: 1111 milliseconds.
Subscriber B: 2 (thread 11). Time: 1591 milliseconds.
Subscriber A: 3 (thread 10). Time: 1611 milliseconds.
Subscriber B: 3 (thread 11). Time: 2091 milliseconds.
Subscriber A: 4 (thread 10). Time: 2111 milliseconds.
Subscriber B: 4 (thread 11). Time: 2591 milliseconds.
Jonas Chapuis
  • 697
  • 5
  • 9
  • Excellent, that solves the problem very neatly. You are definitely an expert in RX. Thank you ever so much! – Contango Feb 01 '15 at 12:22