I am trying to build a service that provides a queue for many asynchronous clients to make requests and await a response. I need to be able to throttle the queue processing by X requests per Y duration. For example: 50 web requests per second. It is for a 3rd party REST Service where I can only issue X requests per second.
Found many SO questions, it is lead me down the path of using TPL Dataflow, I've used a TranformBlock to provide my custom throttling and then X number of ActionBlocks to complete the tasks in parallel. The implementation of the Action seems a bit clunky, so wondering if there is a better way for me to pass Tasks into the pipeline that notify the callers once completed.
I'm wondering if there is there a better or more optimal/simpler way to do what I want? Is there any glaring issues with my implementation? I know it is missing cancellation and exception handing and I'll be doing this next, but your comments are most welcomed.
I've Extended Stephen Cleary's example for my Dataflow pipeline and used
svick's concept of a time throttled TransformBlock. I am wondering if what I've built could be easily achieved with a pure SemaphoreSlim design, its the time based throttling with max operations that I think will complicate things.
Here is the latest implementation. FIFO queue async queue where I can pass in custom actions.
public class ThrottledProducerConsumer<T>
{
private class TimerState<T1>
{
public SemaphoreSlim Sem;
public T1 Value;
}
private BufferBlock<T> _queue;
private IPropagatorBlock<T, T> _throttleBlock;
private List<Task> _consumers;
private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
{
SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
return new TransformBlock<T1, T1>(async (x) =>
{
var sw = new Stopwatch();
sw.Start();
//Console.WriteLine($"Current count: {_sem.CurrentCount}");
await _sem.WaitAsync();
sw.Stop();
var now = DateTime.UtcNow;
var releaseTime = now.Add(Interval) - now;
//-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
var tm = new Timer((s) => {
var state = (TimerState<T1>)s;
//Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
state.Sem.Release();
}, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
-1);
/*
Task.Delay(delay).ContinueWith((t)=>
{
Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
//_sem.Release();
});
*/
//Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
return x;
},
//new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
//
new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
}
public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
{
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
//-- Create the Queue
_queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
//-- Create and link the throttle block
_throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
_queue.LinkTo(_throttleBlock, linkOptions);
//-- Create and link the consumer(s) to the throttle block
var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
_consumers = new List<Task>();
for (int i = 0; i < MaxConsumers; i++)
{
var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
_throttleBlock.LinkTo(consumer, linkOptions);
_consumers.Add(consumer.Completion);
}
//-- TODO: Add some cancellation tokens to shut this thing down
}
/// <summary>
/// Default Consumer Action, just prints to console
/// </summary>
/// <param name="ItemToConsume"></param>
private void ConsumeItem(T ItemToConsume)
{
Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
}
public async Task EnqueueAsync(T ItemToEnqueue)
{
await this._queue.SendAsync(ItemToEnqueue);
}
public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
{
foreach (var item in ItemsToEnqueue)
{
await this._queue.SendAsync(item);
}
}
public async Task CompleteAsync()
{
this._queue.Complete();
await Task.WhenAll(_consumers);
Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
}
}
The test method
public class WorkItem<T>
{
public TaskCompletionSource<T> tcs;
//public T respone;
public string url;
public WorkItem(string Url)
{
tcs = new TaskCompletionSource<T>();
url = Url;
}
public override string ToString()
{
return $"{url}";
}
}
public static void TestQueue()
{
Console.WriteLine("Created the queue");
var defaultAction = new Action<WorkItem<String>>(async i => {
var taskItem = ((WorkItem<String>)i);
Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
//-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
await Task.Delay(5000);
taskItem.tcs.SetResult($"{taskItem.url}");
//Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
});
var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);
var results = new List<Task>();
foreach (var no in Enumerable.Range(0, 20))
{
var workItem = new WorkItem<String>($"http://someurl{no}.com");
results.Add(queue.EnqueueAsync(workItem));
results.Add(workItem.tcs.Task);
results.Add(workItem.tcs.Task.ContinueWith(response =>
{
Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
}));
}
Task.WhenAll(results).Wait();
Console.WriteLine("All Work Items Have Been Processed");
}