4

I'm using Rx to ensure that our backend obeys to some third-party API's Request Limits.

The implementation below utilizes a simple Subject<T> as input queue which is then tamed using James World's custom Pace operator.

This works, BUT only as long as throttledRequests is not observed on the main-thread which is enforced by ObserveOn(TaskPoolScheduler.Default).

As soon as I comment out this line (Line 61), the program behaves as if the Pace operator was not used at all and request get again processed as fast as they are queued up. Can anyone explain that behavior?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    public static class ObservableExtensions
    {
        /// <summary>
        /// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
        /// </summary>
        public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
        {
            return source.Select(i => Observable.Empty<T>()
                    .Delay(interval)
                    .StartWith(i))
                .Concat();
        }
    }

    class Program
    {
        ISubject<int> requests;
        IObservable<int> throttledRequests;

        private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
        {
            var task = throttledRequests
                .Where(x => x == work)
                .Take(1)
                .SelectMany(doWork)
                .ToTask();

            // queue it
            requests.OnNext(work);

            return task;
        }

        private Task<int> DoRequest(int x)
        {
            Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
            return Task.FromResult(x);
        }

        private void Run()
        {
            // initialize request queue
            requests = new Subject<int>();

            // create a derived rate-limited queue
            throttledRequests = requests
                .Pace(TimeSpan.FromMilliseconds(1000))
                .Publish()
                .RefCount()
                .ObserveOn(TaskPoolScheduler.Default);

            Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);

            int i = 0;

            while (true)
            {
                // Queue a number of requests
                var tasks = Enumerable.Range(i * 10, 10)
                    .Select(x => QueueRequest(x, DoRequest))
                    .ToArray();

                Task.WaitAll(tasks);

                Console.ReadLine();
                i++;
            }
        }

        static void Main(string[] args)
        {
            new Program().Run();
        }
    }
}
Community
  • 1
  • 1
Oliver Weichhold
  • 10,259
  • 5
  • 45
  • 87
  • You should try to find out what specific scheduler is used when you comment out `.ObserveOn(TaskPoolScheduler.Default)`. I suspect it'll be either the current thread or immediate scheduler. That should then tell you what is happening. – Enigmativity Sep 21 '15 at 09:14
  • Is there any reason why you are mixing Rx and TPL so much? – Enigmativity Sep 21 '15 at 09:15
  • @Enigmativity When I comment out that line DoRequest will execute on the main-thread (as expected). I have modified the console output of the program to make this more obvious. – Oliver Weichhold Sep 21 '15 at 09:17
  • @Enigmativity Is this really such a weird mix of TPL and Rx? I think Rx is the right tech for the trottling part and processing web requests is a good fit for the TPL. – Oliver Weichhold Sep 21 '15 at 09:22
  • 1
    It's advisable to stick to one or the other. TPL can force the execution of Rx queries and changing between them can then introduce dead-locks. Always switch out at the last possible point if you have to. – Enigmativity Sep 21 '15 at 13:14
  • Your `Pace` method should take an `IScheduler` argument and then pass that into `Delay`. Otherwise you're just implicitly using the default scheduler. – Timothy Shields Sep 22 '15 at 15:47

1 Answers1

2

I cannot answer question in full (not sure why it runs as it runs on ThreadPoolScheduler) but I'll give you my thoughts and show how to fix it to run as expected with or without ThreadPoolScheduler.

First you might notice that even on ThreadPoolScheduler it does not work correctly - often first 1-3 items get processed without any delay. Why after that they start processing with a delay is still not clear for me though. Now to the reason. Consider following sample code:

var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;

Here, there will be no delay and task will be completed immediatly. Why? Because StartWith immediatly injects "1" at the beginning of the sequence and then Take(1) takes this value and completes - there is no reason to continue with a sequence, so delay is never executed. If you use Take(2) for example instead - it will delay for 10 seconds before completion.

For exact same reason, your code never enters delay (you can verify that with debugger by Selecting after Delay and logging to console for example). To fix, just remove Take(1) (or change that to Take(2) for example) - there is always only one item for each key anyway. When you do that, code will run correctly with or without ThreadPoolScheduler.

Evk
  • 98,527
  • 8
  • 141
  • 191