8

I want to run periodic tasks in with a restriction that at most only one execution of a method is running at any given time.

I was experimenting with Rx, but I am not sure how to impose at most once concurrency restriction.

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());

Additionally, if a task is still running, I want the subsequent schedule to elapse. i.e I don't want the tasks to queue up and cause problems.

I have 2 such tasks to execute periodically. The tasks being executed is currently synchronous. But, I could make them async if there is a necessity.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
smartnut007
  • 6,324
  • 6
  • 45
  • 52

4 Answers4

6

You are on the right track, you can use Select + Concat to flatten out the observable and limit the number of inflight requests (Note: if your task takes longer than the interval time, then they will start to stack up since they can't execute fast enough):

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();

source.Subscribe(/*Handle the result of each operation*/);
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • Thanks. I made some modification to the question. How can I ensure that tasks are not getting queued up in cases where the task takes longer than the interval. Currently tasks are sync, but open to async. – smartnut007 Jul 14 '15 at 17:04
6

You should have tested your code as is because this is exactly what Rx imposes already.

Try this as a test:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

When you run this you'll get this kind of output:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119

It is already ensuring that there's no overlap.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • This is true as long as `DoSomething` isn't kicking anything off in the background. – paulpdaniels Jul 14 '15 at 16:06
  • Got it. I made some modification to the question. How can I ensure that tasks are not getting queued up in cases where the task takes longer than the interval. – smartnut007 Jul 14 '15 at 17:04
  • @smartnut007 - Again Rx takes care of it. That's the reason I put the `"!"` in the `.Do(...)` method off of the interval and why I set the interval to `100` milliseconds. If it were queuing I would have got 10x as many `"!"` as the `<16:54:57.111>`, but I didn't. It doesn't queue. – Enigmativity Jul 14 '15 at 22:50
  • @smartnut007 - @Enigmativity has the correct answer if your tasks are _synchronous_, `Interval` uses recursive scheduling, meaning that what happens is the next interval will only be scheduled _after_ the completion of your action. As a result every operation is guaranteed to be 100 milliseconds apart – paulpdaniels Jul 15 '15 at 17:56
  • @paulpdaniels - I think strictly speaking they are guaranteed to be 100ms **apart** (in other words, the gap), but the starting times won't be every 100ms. – Enigmativity Jul 15 '15 at 23:03
  • 1
    @Enigmativity - correct, I meant "relative to the end of the previous action" – paulpdaniels Jul 16 '15 at 00:03
2

Below are two implementations of a PeriodicSequentialExecution method, that creates an observable by executing an asynchronous method in a periodic fashion, enforcing a no-overlapping-execution policy. The interval between subsequent executions can be extended to prevent overlapping, in which case the period is time-shifted accordingly.

The first implementation is purely functional, while the second implementation is mostly imperative. Both implementations are functionally identical. The first one can be supplied with a custom IScheduler. The second one may be slightly more efficient.

The functional implementation:

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= DefaultScheduler.Instance;
    return Delay(dueTime) // Initial delay
        .Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken), linkedCTS => 
            // Execution loop
            Observable.Publish( // Start a hot delay timer before each operation
                Delay(period), hotTimer => Observable
                    .StartAsync(() => action(linkedCTS.Token)) // Start the operation
                    .Concat(hotTimer) // Await the delay timer
            )
            .Repeat()
            .Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
        ));

    IObservable<T> Delay(TimeSpan delay)
        => Observable
            .Timer(delay, scheduler)
            .IgnoreElements()
            .Select(_ => default(T))
            .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
                o.OnError(new OperationCanceledException(cancellationToken)))));
}

The imperative implementation:

public static IObservable<T> PeriodicSequentialExecution2<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    return Observable.Create<T>(async (observer, ct) =>
    {
        using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
            ct, cancellationToken))
        {
            try
            {
                await Task.Delay(dueTime, linkedCTS.Token);
                while (true)
                {
                    var delayTask = Task.Delay(period, linkedCTS.Token);
                    var result = await action(linkedCTS.Token);
                    observer.OnNext(result);
                    await delayTask;
                }
            }
            catch (Exception ex) { observer.OnError(ex); }
        }
    });
}

The cancellationToken parameter can be used for the graceful termination of the resulting observable sequence. This means that the sequence waits for the currently running operation to complete before terminating. If you prefer it to terminate instantaneously, potentially leaving work running unobserved in a fire-and-forget fashion, you can simply dispose the subscription to the observable sequence as always. Canceling the cancellationToken results to the observable sequence completing in a faulted state (OperationCanceledException).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

Here is a factory function that does exactly what you are asking for.

public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}

Here is an example usage

Periodic(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
    {
        Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
        Thread.Sleep(500);
    });

If you run this, each console print will be roughly 1.5 seconds apart.

Note, If you don't want the first tick to run immediately, you could instead use this factory, which won't send the first Unit until after the timespan.

public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}
Jeffrey Patterson
  • 2,342
  • 1
  • 13
  • 9