4

I have tried to simplify my issue by a sample code here. I have a producer thread constantly pumping in data and I am trying to batch it with a time delay between batches so that the UI has time to render it. But the result is not as expected, the produce and consumer seems to be on the same thread.

I don't want the batch buffer to sleep on the thread that is producing. Tried SubscribeOn did not help much. What am I doing wrong here, how do I get this to print different thread Ids on producer and consumer thread.

static void Main(string[] args)
{
    var stream = new ReplaySubject<int>();

    Task.Factory.StartNew(() =>
    {
        int seed = 1;
        while (true)
        {
            Console.WriteLine("Thread {0} Producing {1}",
                Thread.CurrentThread.ManagedThreadId, seed);

            stream.OnNext(seed);
            seed++;

            Thread.Sleep(TimeSpan.FromMilliseconds(500));
         }
    });

    stream.Buffer(5).Do(x =>
    {
        Console.WriteLine("Thread {0} sleeping to create time gap between batches",
            Thread.CurrentThread.ManagedThreadId);

        Thread.Sleep(TimeSpan.FromSeconds(2));
    })
    .SubscribeOn(NewThreadScheduler.Default).Subscribe(items =>
    {
        foreach (var item in items)
        {
            Console.WriteLine("Thread {0} Consuming {1}",
                Thread.CurrentThread.ManagedThreadId, item);
        }
    });
    Console.Read();
}
Massimiliano Kraus
  • 3,638
  • 5
  • 27
  • 47
anivas
  • 6,437
  • 6
  • 37
  • 45

3 Answers3

3

Understanding the difference between ObserveOn and SubscribeOn is key here. See - ObserveOn and SubscribeOn - where the work is being done for an in depth explanation of these.

Also, you absolutely don't want to use a Thread.Sleep in your Rx. Or anywhere. Ever. Do is almost as evil, but Thead.Sleep is almost always totally evil. Buffer has serveral overloads you want to use instead - these include a time based overload and an overload that accepts a count limit and a time-limit, returning a buffer when either of these are reached. A time-based buffering will introduce the necessary concurrency between producer and consumer - that is, deliver the buffer to it's subscriber on a separate thread from the producer.

Also see these questions and answers which have good discussions on keeping consumers responsive (in the context of WPF here, but the points are generally applicable).

The last question above specifically uses the time-based buffer overload. As I said, using Buffer or ObserveOn in your call chain will allow you to add concurrency between producer and consumer. You still need to take care that the processing of a buffer is still fast enough that you don't get a queue building up on the buffer subscriber.

If queues do build up, you'll need to think about means of applying backpressure, dropping updates and/or conflating the updates. These is a big topic too broad for in depth discussion here - but basically you either:

See if proper buffering helps first, then think about throttling/conflating events at the source as (a UI can only show so much infomation anway) - then consider smarter conflation as this can get quite complex. https://github.com/AdaptiveConsulting/ReactiveTrader is a good example of a project using some advanced conflation techniques.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • I cannot drop events or take latest value, I need all the messages to be delivered to the UI as they form individual rows. Addding ObserveOn seems to do the trick, why do say do with sleep is evil? is there a better way to create delay between each batch? – anivas Dec 03 '14 at 06:41
  • @anivas - You can cause a delay by using `Zip` with an `Interval` observable. – Enigmativity Dec 03 '14 at 07:29
  • @Enigmativity tried that methodology the timer in the interval creates a memory leak. – anivas Dec 03 '14 at 08:29
  • 1
    @anivas - It should all go away when you dispose of the observable. – Enigmativity Dec 03 '14 at 10:55
  • 1
    @anivas What he said ^ :) Sleep is evil in general because it is wasteful of the thread resource. Threads aren't cheap and you could cause unnecessary spawning of threads. `await Task.Delay(...)` is often better since it allows the thread to do other useful work - but even then introducing a wait like this is almost always a code smell that a better design can usually avoid. In Rx, it blocks the entire chain running on that thread, which could include the producer and other subscribers depending on the schedulers being used. – James World Dec 03 '14 at 14:29
2

Although the other answers are correct, I'd like to identify your actual problem as perhaps a misunderstanding of the behavior of Rx. Putting the producer to sleep blocks subsequent calls to OnNext and it seems as though you're assuming Rx automatically calls OnNext concurrently, but in fact it doesn't for very good reasons. Actually, Rx has a contract that requires serialized notifications.

See §§4.2, 6.7 in the Rx Design Guidelines for details.

Ultimately, it looks as though you're trying to implement the BufferIntrospective operator from Rxx. This operator allows you to pass in a concurrency-introducing scheduler, similar to ObserveOn, to create a concurrency boundary between a producer and a consumer. BufferIntrospective is a dynamic backpressure strategy that pushes out heterogeneously-sized batches based on the changing latencies of an observer. While the observer is processing the current batch, the operator buffers all incoming concurrent notifications. To accomplish this, the operator takes advantage of the fact that OnNext is a blocking call (per the §4.2 contract) and for that reason this operator should be applied as close to the edge of the query as possible, generally immediately before you call Subscribe.

As James described, you could call it a "smart buffering" strategy itself, or see it as the baseline for implementing such a strategy; e.g., I've also defined a SampleIntrospective operator that drops all but the last notification in each batch.

Dave Sexton
  • 2,562
  • 1
  • 17
  • 26
1

ObserveOn is probably what you want. It takes a SynchronizationContext as an argument, that should be the SynchronizationContext of your UI. If you don't know how to get it, see Using SynchronizationContext for sending events back to the UI for WinForms or WPF

Community
  • 1
  • 1
Daniel C. Weber
  • 1,011
  • 5
  • 12