We have a situation where we want to start a background "polling" operation in a C# application that returns values periodically using reactive extensions. The process we'd like to implement is the following:
- A caller calls a method like
Poll()
that returns anIObservable
- The caller subscribes to said observable, and that starts a background thread/task that interacts with hardware to retrieve values on some interval
- When the caller is done it disposes of the subscription and that automatically stops the background thread/task
Attempt #1
To try to prove this out I wrote the following console app, but this isn't acting the way I was expecting:
public class OutputParameters
{
public Guid Id { get; set; }
public int Value { get; set; }
}
public class Program
{
static void Main(string[] args)
{
Console.WriteLine("Requesting the polling operation");
var worker1 = Poll();
Console.WriteLine("Subscribing to start the polling operation");
var sub1 = worker1.Subscribe(
value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
() => { Console.WriteLine("Thread has completed"); });
Thread.Sleep(5000);
sub1.Dispose();
Console.ReadLine();
}
private static IObservable<OutputParameters> Poll()
{
return Observable.DeferAsync(Worker);
}
private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
Task.Run(async () =>
{
var id = Guid.NewGuid();
const int steps = 10;
try
{
for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
{
Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
subject.OnNext(new OutputParameters { Id = id, Value = i });
// This will actually throw an exception if it's the active call when
// the token is cancelled.
//
await Task.Delay(1000, token);
}
}
catch (TaskCanceledException ex)
{
// Interestingly, if this is triggered because the caller unsibscribed then
// this is unneeded...the caller isn't listening for this error anymore
//
subject.OnError(ex);
}
if (token.IsCancellationRequested)
{
Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
}
else
{
Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
subject.OnCompleted();
}
}, token);
return Task.FromResult(subject.AsObservable());
}
}
The code above actually seems to cancel the background task almost immediately, as this is the output:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled
Attempt #2
I then tried making a small change to the Worker
method to make it async and await the Task.Run
call like so:
private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
await Task.Run(async () =>
{
...what happens in here is unchanged...
}, token);
return subject.AsObservable();
}
The result here though makes it seem like the background task has complete control because it does run for about 5 seconds before being cancelled, but there's no output from the subscribe callbacks. Here's the complete output:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled
My question
So it's clear that I don't fully understand what's happening here, or that using DeferAsync
is the right creation method for an observable in this case.
Is there a proper way to implement this type of approach?