0

We have a situation where we want to start a background "polling" operation in a C# application that returns values periodically using reactive extensions. The process we'd like to implement is the following:

  1. A caller calls a method like Poll() that returns an IObservable
  2. The caller subscribes to said observable, and that starts a background thread/task that interacts with hardware to retrieve values on some interval
  3. When the caller is done it disposes of the subscription and that automatically stops the background thread/task

Attempt #1

To try to prove this out I wrote the following console app, but this isn't acting the way I was expecting:

public class OutputParameters
{
    public Guid Id { get; set; }
    public int Value { get; set; }
}

public class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Requesting the polling operation");
        var worker1 = Poll();

        Console.WriteLine("Subscribing to start the polling operation");

        var sub1 = worker1.Subscribe(
            value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
            ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
            () => { Console.WriteLine("Thread has completed"); });


        Thread.Sleep(5000);

        sub1.Dispose();

        Console.ReadLine();
    }


    private static IObservable<OutputParameters> Poll()
    {
        return Observable.DeferAsync(Worker);
    }


    private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        Task.Run(async () =>
        {
            var id = Guid.NewGuid();
            const int steps = 10;

            try
            {
                for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
                {
                    Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
                    subject.OnNext(new OutputParameters { Id = id, Value = i });

                    // This will actually throw an exception if it's the active call when
                    //  the token is cancelled.
                    //
                    await Task.Delay(1000, token);
                }
            }
            catch (TaskCanceledException ex)
            {
                // Interestingly, if this is triggered because the caller unsibscribed then
                //  this is unneeded...the caller isn't listening for this error anymore
                //
                subject.OnError(ex);
            }

            if (token.IsCancellationRequested)
            {
                Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
            }
            else
            {
                Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
                subject.OnCompleted();
            }
        }, token);

        return Task.FromResult(subject.AsObservable());
    }
}

The code above actually seems to cancel the background task almost immediately, as this is the output:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled

Attempt #2

I then tried making a small change to the Worker method to make it async and await the Task.Run call like so:

    private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        await Task.Run(async () =>
        {
            ...what happens in here is unchanged...
        }, token);

        return subject.AsObservable();
    }

The result here though makes it seem like the background task has complete control because it does run for about 5 seconds before being cancelled, but there's no output from the subscribe callbacks. Here's the complete output:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled

My question

So it's clear that I don't fully understand what's happening here, or that using DeferAsync is the right creation method for an observable in this case.

Is there a proper way to implement this type of approach?

Sam Storie
  • 4,444
  • 4
  • 48
  • 74
  • `Subject` has to start the async work of polling as soon as someone subscribes to it. Look into how to keep track of subscribers in c#. You can either use events or use the observer pattern, I would just use events. Then once all subscribers have unsubscribe, send a cancellation token to stop the polling. Most of this code should be in `Subject` so it is encapsulated and not within the `Program` class. – CodingYoshi Jun 26 '18 at 14:49
  • 1
    Also, first try it with a windows form application and once you have it working, search for `Task` in console apps and study that because it is a bit trickier with console. – CodingYoshi Jun 26 '18 at 14:52
  • Out of curiosity, was there a reason for the down vote? – Sam Storie Jun 26 '18 at 18:44
  • If you are asking me, I was wondering this myself. I think your question is well worded and pretty clear. – CodingYoshi Jun 26 '18 at 18:47
  • This sounds like an [XY problem](http://xyproblem.info/) situation. Do you need to mix TPL and Rx? If not, your solution would be much simpler by just picking one. – Shlomo Jun 26 '18 at 20:49
  • @Shlomo I want to retain Rx for the public API of this, but I am not tied to using tasks if there's an alternative. Do you know of any resources that demonstrate how to do this strictly with Rx? I'm not sure how to set up the code that would be running in the background, pumping messages out of the Subject using just Rx. – Sam Storie Jun 26 '18 at 21:11
  • 1
    @SamStorie - Don't use a `Subject` like you have in your question. Either wrap it in an `Observable.Defer` or try to avoid it altogether. Having it sit as a variable in the method causes it to be captured in your observable creating a run-once observable - and any error or completion signal sent will kill your observable for all time. – Enigmativity Jun 27 '18 at 13:04
  • In case you are concerned about overlapping interactions with the hardware, you could take a look at the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator. – Theodor Zoulias Dec 04 '20 at 09:48

1 Answers1

2

This will do it, if an RX-only solution will suffice. Much cleaner if you asked me...

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
        .SelectMany(id => 
            Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
                .ObserveOn(new EventLoopScheduler())
                .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
                .Select(i => new OutputParameters { Id = id, Value = i })
        );
}

Explanation:

  • Generate is like a for-loop for Rx. The last argument controls when items are emitted. This is equivalent to your for loop + Task.Delay.
  • ObserveOn controls where/when the observable is observed. In this case, the EventLoopScheduler will spin up one new thread per subscriber, and all items from that observable will be observed on the new thread.

From Enigmativity:

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<OutputParameters>(() =>
    {
        var id = Guid.NewGuid();
        return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
                _ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
            .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
            .Select(i => new OutputParameters { Id = id, Value = i });
    });
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • You should wrap the observable inside an `Observable.Defer` to enable capturing of the `id` otherwise multiple subscribers will get the same `id`. – Enigmativity Jun 27 '18 at 13:06
  • Also, based on how the OP has asked the question I would use an `EventLoopScheduler` (which can be added as a parameter to the `.Generate` method. – Enigmativity Jun 27 '18 at 13:08
  • @Enigmativity What does the EventLoopScheduler bring to the table? I've never had a case to use that yet (but perhaps I should be!) – Sam Storie Jun 27 '18 at 13:15
  • @Shlomo Thanks for writing this up. I had never explored the .Generate approach, but that is much cleaner than trying to mix things. This works great for my MVP, so I'll take it back to our real app and try it there. – Sam Storie Jun 27 '18 at 13:16
  • "`NewThreadScheduler` will spin up one new thread" - No, it doesn't. It sets up a factory that creates a new thread for everything scheduled on it - `private readonly Func _threadFactory;`. It will give you a single thread for this one observable, but it will spin up as many threads as observables you create. – Enigmativity Jun 27 '18 at 13:17
  • 1
    @SamStorie - `EventLoopScheduler` ensures that every observable and every action scheduled on this observable uses one and only one thread and ensures then, of course, that there are no concurrent accesses performed - there's only one thread so two things can't run at once. It's a very neat way to allow multi-threaded code to run against non-thread safe code. – Enigmativity Jun 27 '18 at 13:19
  • 1
    @Enigmativity, thanks. Updated answer to include Guid initializing in `Defer`, and preference of `EventLoopScheduler` over `NewThreadScheduler`. – Shlomo Jun 27 '18 at 14:41
  • 1
    @Shlomo - My apologies for editing you answer. I thought I'd put up my variant of your answer as an edit to your answer rather than posting my own answer and making everything confusing. Please feel free to use it or delete it. The reason I posted it is that the `Return`/`SelectMany`/`ObserveOn` combination would use a thread from the thread-pool first before marshalling to the `EventLoopScheduler`. The way I'm written it will avoid all of that and stick to the single thread. – Enigmativity Jun 28 '18 at 00:28
  • 1
    Thanks again to you both for the help. We have put this into our actual app and the approach works great. A small change I made was to let me pass in the scheduler so I can test easier using TestScheduler...something these answers helped me explore as well. Kudos! – Sam Storie Jun 28 '18 at 12:43