I'm using Rx to ensure that our backend obeys to some third-party API's Request Limits.
The implementation below utilizes a simple Subject<T>
as input queue which is then tamed using James World's custom Pace operator.
This works, BUT only as long as throttledRequests
is not observed on the main-thread which is enforced by ObserveOn(TaskPoolScheduler.Default)
.
As soon as I comment out this line (Line 61), the program behaves as if the Pace
operator was not used at all and request get again processed as fast as they are queued up. Can anyone explain that behavior?
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
public static class ObservableExtensions
{
/// <summary>
/// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
/// </summary>
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i))
.Concat();
}
}
class Program
{
ISubject<int> requests;
IObservable<int> throttledRequests;
private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
{
var task = throttledRequests
.Where(x => x == work)
.Take(1)
.SelectMany(doWork)
.ToTask();
// queue it
requests.OnNext(work);
return task;
}
private Task<int> DoRequest(int x)
{
Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
return Task.FromResult(x);
}
private void Run()
{
// initialize request queue
requests = new Subject<int>();
// create a derived rate-limited queue
throttledRequests = requests
.Pace(TimeSpan.FromMilliseconds(1000))
.Publish()
.RefCount()
.ObserveOn(TaskPoolScheduler.Default);
Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);
int i = 0;
while (true)
{
// Queue a number of requests
var tasks = Enumerable.Range(i * 10, 10)
.Select(x => QueueRequest(x, DoRequest))
.ToArray();
Task.WaitAll(tasks);
Console.ReadLine();
i++;
}
}
static void Main(string[] args)
{
new Program().Run();
}
}
}