1

In the following code from the answer of What's a good way to run periodic tasks using Rx, with a single concurrent execution restriction?,

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

I'm trying to add cancellation and test stopping the program after five seconds.

using System.Reactive.Linq;

Task DoSomething(CancellationToken cancellationToken=default)
{
    if (cancellationToken.IsCancellationRequested) { return Task.CompletedTask;  }
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
    return Task.CompletedTask;
}

async Task WaitAndThenCancel(int seconds, CancellationTokenSource cancellationTokenSource)
{
    await Task.Delay(seconds*1000);
    Console.WriteLine("Cancelling...");
    cancellationTokenSource.Cancel();
}

void Main(CancellationToken cancellationToken=default)
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer. Do(x =>
    {
        if (cancellationToken.IsCancellationRequested)
        {
            Console.WriteLine("Canceled - Main");
            return; // Need to stop the stream here
        }
        Console.WriteLine("!");
    }).Subscribe(async tick => await DoSomething(cancellationToken)))
    {
        Console.ReadLine();
    }
}

var ct = new CancellationTokenSource();
WaitAndThenCancel(5, ct);
Main(ct.Token);

I expect the code to print the current time for N seconds, and then print "Canceled - Main" and stop. However, it starts to print "Canceled - Main" after N seconds and never stop?

!
<15:00:23.823>
!
<15:00:24.836>
!
<15:00:25.853>
!
<15:00:26.860>
!
<15:00:27.863Cancelling...
>
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
Canceled - Main
....

To support async DoSomething()

Using TakeUntil().

using System.Reactive;
using System.Reactive.Linq;

async Task DoSomething(CancellationToken cancellationToken = default)
{
    if (cancellationToken.IsCancellationRequested)
    {
        return; // Task.CompletedTask;
    }
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    await Task.Delay(1000); // Thread.Sleep(1000);
    Console.WriteLine(">");
}

async Task Main3(CancellationToken cancellationToken = default)
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var cancel = Observable.Create<Unit>(observer => cancellationToken.Register(() => { 
        // observer.OnNext(default); 
        observer.OnCompleted(); }));
    using (timer.Do(x =>
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    Console.WriteLine("Canceled - Main");
                    return;
                }
                Console.WriteLine("do!");
            })
            .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
            .TakeUntil(cancel)
            .Select(_ => Observable.FromAsync(() => DoSomething(cancellationToken)))
            .Concat()
            .Subscribe())
    {
        Console.WriteLine("Will wait for timed cancelation here.");
        try
        {
            await Task.Delay(Timeout.Infinite, cancellationToken);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($">{Environment.NewLine}Canceled - Main. In Using");
        }
    }
}

var ct = new CancellationTokenSource();
ct.CancelAfter(5000);
await Main3(ct.Token);
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • 1
    you are mixing RX with Task, I assumed that is dure to low experience? treat yur observales as unit. await myMainObseervalbe.TakeUntil(emitAfter5Seconds) – Apostolis Bekiaris Sep 03 '22 at 08:23
  • I'm new to Rx, the `Wait()` is for testing cancellation - it cancels the process after 5 seconds by setting the variable. – ca9163d9 Sep 03 '22 at 10:09
  • @ApostolisBekiaris, the question is updated. there were some issues in the old question. – ca9163d9 Sep 04 '22 at 19:05

2 Answers2

1

If you want to run an observable and stop it after a set time interval then you should simply use .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0))).

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));

var subscription =
    timer
        .Do(x => Console.WriteLine("!"))
        .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
        .Subscribe(tick => DoSomething());
        
using (subscription)
{
    Console.ReadLine();
}

If you want to use a CancellationToken then you could use this:

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken ct = cts.Token;
var cancel = Observable.Create<Unit>(observer => cts.Token.Register(() => { observer.OnNext(default); observer.OnCompleted(); }));

var subscription =
    timer
        .Do(x => Console.WriteLine("!"))
        .TakeUntil(cancel)
        .Subscribe(tick => DoSomething());
        
using (subscription)
{
    await Task.Delay(TimeSpan.FromSeconds(5.0));
    cts.Cancel();
    Console.ReadLine();
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Can I do both `.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))` and using `cancellationToken`? Using two `TakeUntil()`? – ca9163d9 Sep 11 '22 at 22:55
  • @ca9163d9 - You can try and find out. These fluent interfaces are pretty cool. – Enigmativity Sep 11 '22 at 23:00
  • Curiously, in `Observable.Create(observer => cts.Token.Register(() => { observer.OnNext(default); observer.OnCompleted(); }));`, why we need `observer.OnNext(default);`? I tried to remove it and it works too? – ca9163d9 Sep 11 '22 at 23:09
  • @ca9163d9 - It didn't work for me. You must have the `OnNext`. Can you show me code where it worked for you? – Enigmativity Sep 11 '22 at 23:53
  • I've posted the code screenshot at the end of the question. – ca9163d9 Sep 12 '22 at 00:01
  • @ca9163d9 - Please don't post screenshots. Please post the actual code so I can run it. – Enigmativity Sep 12 '22 at 00:04
  • OK. Posted the code. – ca9163d9 Sep 12 '22 at 00:08
  • Your `await Task.Delay(Timeout.Infinite, cancellationToken);` is dropped out when you cancel the token and thus unsubscribing. The `cancel` observable isn't being hit at all. – Enigmativity Sep 12 '22 at 00:09
  • @ca9163d9 - Sorry, forgot to `@` you. – Enigmativity Sep 12 '22 at 00:09
  • It did print the line thought `Console.WriteLine($">{Environment.NewLine}Canceled - Main. In Using");`. – ca9163d9 Sep 12 '22 at 00:18
  • @ca9163d9 - Yes, because the `await Task.Delay(Timeout.Infinite, cancellationToken);` was cancelled. – Enigmativity Sep 12 '22 at 00:31
  • @ca9163d9 - If you comment out the `.TakeUntil(cancel)` line it still cancels the same way. You're just calling `Dispose` on the subscription. – Enigmativity Sep 12 '22 at 00:32
  • I added `Console.ReadLine();` at the end, and if I comment out `.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))` (so only `TakeUntil(cancel)`, it will keep printing `Canceled - Main` after five seconds - with or without `observer.OnNext(default);`. – ca9163d9 Sep 12 '22 at 01:22
  • @ca9163d9 - I made the changes to the code you described. When I include `observer.OnNext(default);` the observable stops. When I comment it out it goes on forever. – Enigmativity Sep 12 '22 at 01:30
0

Your Main is not canceling subscription because Console.ReadLine(); prevents it from doing so. I suggest a slightly different approach:

async Task Main(CancellationToken cancellationToken)
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x =>
           {
               if (cancellationToken.IsCancellationRequested)
               {
                   Console.WriteLine("Canceled - Main");
                   return;
               }
               Console.WriteLine("!");
           }).Subscribe(async tick => await DoSomething(cancellationToken)))
    {
        Console.WriteLine("Will wait for timed cancelation here.");
        try
        {
            await Task.Delay(Timeout.Infinite, cancellationToken);
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($">{Environment.NewLine}Canceled - Main. In Using");
        }
    }
}

var ct = new CancellationTokenSource();
ct.CancelAfter(5000); // also no need to reinvent the wheel, just use the build in method for timed cancelation 
await Main(ct.Token);

One more note Subscribe is not task-aware, i.e. it will not handle "truly" async methods correctly. For example if you change the DoSomething method to the following:

async Task DoSomething(CancellationToken cancellationToken = default)
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    await Task.Delay(1000);
    Console.WriteLine(">");
}

The result will differ from what is expected/wanted. One approach to handle that is:

using (timer.Do(x =>
           {
               if (cancellationToken.IsCancellationRequested)
               {
                   Console.WriteLine("Canceled - Main");
                   return;
               }

               Console.WriteLine("!");
           })
           .Select(_ => Observable.FromAsync(() => DoSomething(cancellationToken)))
           .Concat()
           .Subscribe())
{
     // ...
}
halfer
  • 19,824
  • 17
  • 99
  • 186
Guru Stron
  • 102,774
  • 10
  • 95
  • 132
  • Thanks, the last piece of the code is similar to another high voted answers in the original question I referred. Is it the better approach? I couldn't make it work in my test console program (it didn't print anything). Will try it again. – ca9163d9 Sep 04 '22 at 19:26
  • I tried to run `using (timer.Do(x => {....}).Select(_ => Observable.FromAsync(() => DoSomething(cancellationToken))).Concat().Subscribe()){}`, but it didn't print anything. – ca9163d9 Sep 04 '22 at 19:57
  • @ca9163d9 my answer contains full runnable code which I've used for testing (except for original `DoSomething` implmentation). – Guru Stron Sep 04 '22 at 20:00
  • hmm, I've posted the full source code screenshot in the question. – ca9163d9 Sep 04 '22 at 20:11
  • 1
    @ca9163d9 yep, that misses some crucial parts from my code. i.e. `await Task.Delay(Timeout.Infinite, cancellationToken);` and `async` for `Main`. Without it the `Dispose` called by `using` will cancel subscription immediately (`Console.ReadLine();` served the similar purpose in the original code). – Guru Stron Sep 04 '22 at 20:12
  • It works after I added the missing part. However, I found that removing `if (cancellationToken.IsCancellationRequested) { return; }` in `timer.Do(x => { ... }` also works. Is it needed? – ca9163d9 Sep 04 '22 at 21:58
  • @ca9163d9 I would say - no, to my understanding it was added for testing purposes. – Guru Stron Sep 04 '22 at 22:11