10

I have a very simple IObservable<int> that acts as a pulse generator every 500ms:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

And I have a CancellationTokenSource (that is used to cancel other work that is going on simultaneously).

How can I use the cancellation token source to cancel my observable sequence?

Ronald Wildenberg
  • 31,634
  • 14
  • 90
  • 133

5 Answers5

12

It is an old thread, but just for future reference, here is a simpler way to do it.

If you have a CancellationToken, you are probably already working with tasks. So, just convert it to a Task and let the framework do the binding:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

This will create an internal subscriber that will be disposed when the task is cancelled. This will do the trick in most cases because most observables only produce values if there are subscribers.

Now, if you have an actual observable that needs to be disposed for some reason (maybe a hot observable that is not important anymore if a parent task is cancelled), this can be achieved with a continuation:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});
Natan
  • 4,686
  • 5
  • 30
  • 48
8

If you're using the GenerateWithTime (replaced now with Generate passing in a timespan func overload), you can replace the second parameter to evaulate the state of the cancellation token as follows:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

Alternatively, if your event which causes the cancellation token to be set can be converted to an observable itself, you could use something like the following:

pulses.TakeUntil(CancelRequested);

I posted a more detailed explanation at http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable as well.

Jim Wooley
  • 10,169
  • 1
  • 25
  • 43
5

Here are two handy operators for canceling observable sequences. The difference between them is on what happens in case of cancellation. The TakeUntil causes a normal completion of the sequence (OnCompleted), while the WithCancellation causes an exceptional termination (OnError).

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

Usage example:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

Note: In case of cancellation, the custom operators presented above are unsubscribing instantly from the underlying observable. This is something to consider in case the observable includes side-effects. Putting the TakeUntil(cts.Token) before the operator that performs the side-effects will postpone the completion of the whole observable, until the side-effects are completed (graceful termination). Putting it after the side-effects will make the cancellation instantaneous, resulting potentially to any running code to continue running unobserved, in a fire-and-forget fashion.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Just stumbled upon your answer, having submitted this: https://github.com/dotnet/reactive/issues/1773. Sinergy again! :) – noseratio May 30 '22 at 23:46
  • 1
    Hi @noseratio! I am now a bit afraid with the custom `TakeUntil(CancellationToken)` operator, because of [this](https://stackoverflow.com/questions/66630603/strange-rxcancellationtoken-issue-sometimes-the-registered-callback-does-not-c) issue. Also I saw that you are using the `Finally` operator in the GitHub example. Be careful with this operator. If the action is not bulletproof, it might [crash the process](https://stackoverflow.com/questions/69815945/how-to-implement-a-better-finally-rx-operator)! – Theodor Zoulias May 31 '22 at 00:46
  • Thanks for the head-ups Theodor, very interesting! So far, my `Finally` callback just calls another `cts.Cancel`, and the chained cancel callbacks should be all properly guarded. But still... do you think this implementation might be better than via `TakeUntil`? https://i.stack.imgur.com/mDZM1.png – noseratio May 31 '22 at 00:57
  • 1
    @noseratio regarding the [screenshot](https://i.stack.imgur.com/mDZM1.png), I think that the `observer.OnError` should be called before disposing the subscription. Calling it afterwards it will probably be muted. But I might be wrong. – Theodor Zoulias May 31 '22 at 01:11
  • 1
    This might be a valid concern. I've just stumbled upon an interesting document from 2010 where I hope to find an answer about that, too: https://stackoverflow.com/a/72441341/1768303 – noseratio May 31 '22 at 01:30
2

You can connect your IObservable subscription with CancellationTokenSource with the following snippet

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);
StanislawSwierc
  • 2,571
  • 17
  • 23
  • Both your answer and Jim's are valid methods to accomplish this. Your's can be applied to all kinds of observable but Jim's is a little more concise when using one of the `Generate` methods. – Ronald Wildenberg Jul 22 '11 at 07:40
0

You get an IDisposable instance back from subscribing. Call Dispose() on that.

corvuscorax
  • 5,850
  • 3
  • 30
  • 31
  • I know, but than I need to have some other process that polls the cancellation status and disposes of the subscription when cancellation occurs. I was looking for something more automatic, somehow linking my cancellation token source to my observable. – Ronald Wildenberg Jul 20 '11 at 09:54