1

I have an asynchronous subscribe logic and when it run I don't want it to run again until that logic is finished. But if there is any event observed while the logic is run, it will be throttle until the subscribe logic is finished, and then fire again immediately

So I wish that I could write

observable.SubscribeAsync(async(data) => await DoSomething(data))`

Or more precisely. I want to Throttle the input data, not skipping or waiting but throttling it. So I think it might be better if we have ThrottleWhile (and maybe also DebounceWhile or ThrottleUntil)

bool running = false;
observable.ThrottleWhile((_) => running).Subscribe((data) => {
    try
    {
        running = true;
        await DoSomething(data);
    }
    finally
    {
        running = false;
    }
})`

Is this already possible? How can I fabricate this flow in reactive pattern?

Thaina Yu
  • 1,372
  • 2
  • 16
  • 27
  • This line leaves me uncertain what you are expecting "I want to Throttle the input data, not skipping or waiting but throttling it." Any throttle or debounce functions I've seen before skip or ignore values at certain timing intervals. To clarify what you are looking for, would `Observable.Interval(1 sec).Take(10).ThrottleWhile(takes 1.5s to complete)` process every value eventually, every other value (ignoring ones that come in while a function is running), or something else? – Gideon Engelberth Aug 10 '20 at 21:41
  • @GideonEngelberth Yes, the main point is normal `throttle` will _skip or ignore values at certain timing intervals_, that what I don't want. What I want is throttling independent from time. If the state is `false` then I want it to continue immediately. But if logic is `true` then I want it to behave the same way as throttle, caching the most recent value readily to be sent when the state is passed, hence `ThrottleWhile` – Thaina Yu Aug 11 '20 at 03:36

2 Answers2

2

I have not kept up with the library, so it's possible a built in operator has been added, but I am not aware of one in the version I use. Here's a basic version of what it sounds like you were asking for:

static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext,
                                     Action<Exception> onError, Action onComplete)
{
    //argument error checking omitted for brevity
    T current = default(T);
    bool processing = false;
    bool haveCurrent = false;

    return source
           .Where((v) =>
                  {
                      if (processing)
                      {
                          current = v;
                          haveCurrent = true;
                      }
                      return !processing;
                  })
           .Subscribe((v) =>
                      {
                          Action<Task> runNext = null;
                          runNext = (task) =>
                              {
                                  if (haveCurrent)
                                  {
                                      haveCurrent = false;
                                      onNext(current).ContinueWith(runNext);
                                  }
                                  else
                                  {
                                      processing = false;
                                  }
                              };
                          processing = true;
                          onNext(v).ContinueWith(runNext);
                      },
                      onError,
                      onComplete);
}

Some notes to consider with this implementation:

The last call to onNext can occur after a call to onComplete if the final item and completion occur while a previous item is processing. Similarly, it is possible for onComplete to be called while an onNext task is running. This may not be an issue if your source never completes. If it is an issue, you would have to decide if you want to delay the call to onComplete until after the last item is processed or cancel the processing of the last item and adjust accordingly.

I made no attempt to test for any race conditions that may or may not occur when items come in right as a processing run completes.

Gideon Engelberth
  • 6,095
  • 1
  • 21
  • 22
1

How about something like this:

public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> projectAsync)
{
    return Observable.Create<TResult>(
        observer =>
        {
            var throttle = new BehaviorSubject<TResult>(default);

            var observable = source
                .Zip(throttle, (value, _) => value)
                .SelectMany(value => Observable.Defer(() => Observable.StartAsync(() => projectAsync(value))))
                .Publish();

            return new CompositeDisposable(
                observable.Subscribe(throttle),
                observable.Subscribe(observer),
                observable.Connect(),
                throttle
            );
        }
    );
}

In this extension method the Zip combined with the BehaviorSubject form a throttle within which items are queued until projectAsync is complete.

It can then be used as follows:

public static async Task<Unit> DoSomethingAsync(int value)
{
    Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Started processing value '{value}'");

    await Task.Delay(TimeSpan.FromSeconds(1));

    Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Completed processing value '{value}'");

    return Unit.Default;
}

public static async Task RunAsync()
{
    IObservable<int> source = Observable.Generate(0, value => value < 25, value => value + 1, value => value, value => TimeSpan.FromSeconds(0.1));

    await source
            .Do(value => Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Received value '{value}'"))
            .SelectAsync(value => DoSomethingAsync(value))
            .ToTask();
}

Wherein, a source observable is generated which emits 25 items at 100ms intervals. The DoSomethingAsync method uses Task.Delay to simulate a 1 second processing delay. Running this code should result in the following output:

[02:07:56] Received value '0'
[02:07:56] Started processing value '0'
[02:07:56] Received value '1'
[02:07:56] Received value '2'
[02:07:57] Received value '3'
[02:07:57] Received value '4'
[02:07:57] Received value '5'
[02:07:57] Received value '6'
[02:07:57] Received value '7'
[02:07:57] Received value '8'
[02:07:57] Received value '9'
[02:07:57] Completed processing value '0'
[02:07:57] Started processing value '1'
[02:07:57] Received value '10'
[02:07:57] Received value '11'
[02:07:58] Received value '12'
[02:07:58] Received value '13'
[02:07:58] Received value '14'
[02:07:58] Received value '15'
[02:07:58] Received value '16'
[02:07:58] Received value '17'
[02:07:58] Received value '18'
[02:07:58] Completed processing value '1'
[02:07:58] Started processing value '2'
[02:07:58] Received value '19'
[02:07:58] Received value '20'
[02:07:59] Received value '21'
[02:07:59] Received value '22'
[02:07:59] Received value '23'
[02:07:59] Received value '24'
[02:07:59] Completed processing value '2'
[02:07:59] Started processing value '3'
[02:08:00] Completed processing value '3'
[02:08:00] Started processing value '4'
[02:08:01] Completed processing value '4'
...
[02:08:20] Started processing value '23'
[02:08:21] Completed processing value '23'
[02:08:21] Started processing value '24'
[02:08:22] Completed processing value '24'

You should be aware that this code does not provide any means of back-pressure to the source so, should the source continually emit items faster than projectAsync, memory pressure will build (via queuing within the Zip operator) until you receive an out of memory exception.

Furthermore, while I don't know the use case for this requirement, you might want to consider whether "System.Interactive.Async" or "System.Threading.Tasks.DataFlow" might be a better fit here.

ibebbs
  • 1,963
  • 2
  • 13
  • 20
  • This is not what I expect What I expect is, based on your timeline, it would be 0 -> 9 -> 18 -> 24 while disposing the in between – Thaina Yu Aug 24 '20 at 04:40
  • Ok, then I'm afraid your description wasn't clear. You specifically stated: "I want to Throttle the input data, not skipping or waiting but throttling it." whereas if you only expect to process items 0, 9, 18 & 24 you are most certainly skipping. FYI, the above could easily be adapted to this use case via the Buffer and TakeLast operators. – ibebbs Aug 24 '20 at 11:38
  • Because `skip` is the name of rx operator, it will skip 9 and continue with 10. While `wait` is the name of another rx operator that will be another behaviour. `throttle` is the name of yet another rx operator that will cache the last value ready for emit at the interval, which is the exact same behavior I need albeit using Task instead of interval – Thaina Yu Aug 24 '20 at 11:49
  • Also thanks for suggestion of `buffer` and `takelast`, but that was the approach I don't want because `buffer` will unnecessary cache all value into the list while I only need the last one – Thaina Yu Aug 24 '20 at 11:51