3

I am trying to build a service that provides a queue for many asynchronous clients to make requests and await a response. I need to be able to throttle the queue processing by X requests per Y duration. For example: 50 web requests per second. It is for a 3rd party REST Service where I can only issue X requests per second.

Found many SO questions, it is lead me down the path of using TPL Dataflow, I've used a TranformBlock to provide my custom throttling and then X number of ActionBlocks to complete the tasks in parallel. The implementation of the Action seems a bit clunky, so wondering if there is a better way for me to pass Tasks into the pipeline that notify the callers once completed.

I'm wondering if there is there a better or more optimal/simpler way to do what I want? Is there any glaring issues with my implementation? I know it is missing cancellation and exception handing and I'll be doing this next, but your comments are most welcomed.

I've Extended Stephen Cleary's example for my Dataflow pipeline and used
svick's concept of a time throttled TransformBlock. I am wondering if what I've built could be easily achieved with a pure SemaphoreSlim design, its the time based throttling with max operations that I think will complicate things.

Here is the latest implementation. FIFO queue async queue where I can pass in custom actions.

public class ThrottledProducerConsumer<T>
{
    private class TimerState<T1>
    {
        public SemaphoreSlim Sem;
        public T1 Value;
    }

    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
    {
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
        {
            var sw = new Stopwatch();
            sw.Start();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();

            sw.Stop();
            var now = DateTime.UtcNow;
            var releaseTime = now.Add(Interval) - now;

            //-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
            var tm = new Timer((s) => {
                var state = (TimerState<T1>)s;
                //Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
                state.Sem.Release();

            }, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
            -1);

            /*  
            Task.Delay(delay).ContinueWith((t)=>
            {
                Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
                //_sem.Release();
            });
            */

            //Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
            return x;
        },
             //new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
             //
             new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
    }

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
    {
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };

        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });

        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);

        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
        {
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
            _consumers.Add(consumer.Completion);
        }

        //-- TODO: Add some cancellation tokens to shut this thing down
    }

   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
    {
        Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    }

    public async Task EnqueueAsync(T ItemToEnqueue)
    {
        await this._queue.SendAsync(ItemToEnqueue);
    }

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
    {
        foreach (var item in ItemsToEnqueue)
        {
            await this._queue.SendAsync(item);
        }
    }

    public async Task CompleteAsync()
    {
        this._queue.Complete();
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
    }
}

The test method

    public class WorkItem<T>
    {
        public TaskCompletionSource<T> tcs;
        //public T respone;
        public string url;
        public WorkItem(string Url)
        {
            tcs = new TaskCompletionSource<T>();
            url = Url;
        }
        public override string ToString()
        {
            return $"{url}";
        }
    }

    public static void TestQueue()
    {
        Console.WriteLine("Created the queue");

        var defaultAction = new Action<WorkItem<String>>(async i => {
            var taskItem = ((WorkItem<String>)i);
            Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(5000);
            taskItem.tcs.SetResult($"{taskItem.url}");
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
        });

        var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);

        var results = new List<Task>();
        foreach (var no in Enumerable.Range(0, 20))
        {
            var workItem = new WorkItem<String>($"http://someurl{no}.com");
            results.Add(queue.EnqueueAsync(workItem));
            results.Add(workItem.tcs.Task);
            results.Add(workItem.tcs.Task.ContinueWith(response =>
            {
                Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
            }));
        }

        Task.WhenAll(results).Wait();
        Console.WriteLine("All Work Items Have Been Processed");
    }
Community
  • 1
  • 1
Nicholas
  • 572
  • 6
  • 17
  • 1
    Some thoughts: 1) You're in C# and mention web requests; IIS pretty-much does this already (including throttling, although admittedly on the input rather than output side). 2) Perhaps something as simple as a ConcurrentQueue is all you need to handle the concurrency, with a SemaphoreSlim for the throttling. 3) If you're eventually going to scale out on this, particularly to multiple machines servicing requests, a service bus might be a better option? Throttling would be more difficult, though. – sellotape Oct 05 '16 at 08:10
  • Why not to add another block at the end of queue, which will notify the caller about the handled request? – VMAtm Oct 05 '16 at 14:36
  • @VMAtm I guess that is a way, however I was thinking about passing in an action delegate or even a task into the queue that would notify the caller upon completion. – Nicholas Oct 05 '16 at 23:10
  • Closures are very memory consuming, so you probably shouldn't use them in such case. Action block which determines by key how it should notify caller is much preferable, as for me. – VMAtm Oct 05 '16 at 23:20
  • @sellotape ideally I'd move this into a service of its own, but I need a quick win that will allow me to create a queue of requests that I can keep in memory (cache and initalise upon each app domain recycle). I do not have much experience with these objects so its a bit of learning on my part. The Datafow solution above does appear to do what I want with FIFIO and the other advantage is executing the requests in parallel , but I've not bench marked anything yet. Do you have any further reasons why I shouldnt go with the above? – Nicholas Oct 05 '16 at 23:22
  • @VMAtm Oh I didn't realise this, when you are referring to closures you are talking about tasks? The only think I was thinking with the task, is continueation and cancellation, I could include a timeout with the request so if it reaches the execution point and the client is no longer there I could discard the request. – Nicholas Oct 05 '16 at 23:26
  • I thought that you going to use the [closures](http://csharpindepth.com/Articles/Chapter5/Closures.aspx) in tasks, which can degrade performance, only this I meant. So you should measure your solution – VMAtm Oct 06 '16 at 00:53
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/125043/discussion-between-nicholas-and-vmatm). – Nicholas Oct 06 '16 at 02:13

1 Answers1

2

Since asking, I have created a ThrottledConsumerProducer class based on TPL Dataflow. It was tested over a number of days which included concurrent producers which were queued and completed in order, approx 281k without any problems, however there my be bugs I've not discovered.

  1. I am using a BufferBlock as an asynchronous queue, this is linked to:
  2. A TransformBlock which provides the throttling and blocking I need. It is used in conjunction with a SempahoreSlim to control the max requests. As each item is passed through the block, it increments the semaphore and schedules a task to run X duration later to release the semaphore by one. This way I have a sliding window of X requests per duration; exactly what I wanted. Because of TPL I am also leveraging parallelism to the connected:
  3. ActionBlock(s) which are responsible for performing the task I need.

The classes are generic, so it might be useful to others if they need something similar. I have not written cancellation or error handling, but thought I should just mark this as answered to move it along. I would be quite happy to see some alternatives and feedback, rather than mark mine as an accepted answer. Thanks for reading.

NOTE: I removed the Timer from the original implementation as it was doing weird stuff causing the semaphore to release more than the maximum, I am assuming it is dynamic context error, it occurred when I started running concurrent requests. I worked around it using Task.Delay to schedule a release of a semaphore lock.

Throttled Producer Consumer

public class ThrottledProducerConsumer<T>
{
    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, 
        Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2)
    {
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
        {
            //Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}");
            var sw = new Stopwatch();
            sw.Start();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();

            sw.Stop();
            var delayTask = Task.Delay(Interval).ContinueWith((t) =>
            {
                //Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}");
                _sem.Release();
                //Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            });
            //},TaskScheduler.FromCurrentSynchronizationContext());                
            //Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            return x;
        },
             //-- Might be better to keep Bounded Capacity in sync with the semaphore
             new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax,
                 MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism });
    }

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, 
        Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1, 
        Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10)
    {
        //-- Probably best to link MaxPerInterval and MaxThrottleBuffer 
        //  and MaxConsumers with MaxDegreeOfParallelism
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };

        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });

        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);

        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
        {
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
            _consumers.Add(consumer.Completion);
        }

        //-- TODO: Add some cancellation tokens to shut this thing down
    }

   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
    {
        Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    }

    public async Task EnqueueAsync(T ItemToEnqueue)
    {
        await this._queue.SendAsync(ItemToEnqueue);
    }

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
    {
        foreach (var item in ItemsToEnqueue)
        {
            await this._queue.SendAsync(item);
        }
    }

    public async Task CompleteAsync()
    {
        this._queue.Complete();
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
    }
    private static void Log(String messageToLog)
    {
        System.Diagnostics.Trace.WriteLine(messageToLog);
        Console.WriteLine(messageToLog);
    }

}

- Example Usage -

A Generic WorkItem

public class WorkItem<Toutput,Tinput>
{
    private TaskCompletionSource<Toutput> _tcs;
    public Task<Toutput> Task { get { return _tcs.Task; } }

    public Tinput InputData { get; private set; }
    public Toutput OutputData { get; private set; }

    public WorkItem(Tinput inputData)
    {
        _tcs = new TaskCompletionSource<Toutput>();
        InputData = inputData;
    }

    public void Complete(Toutput result)
    {
        _tcs.SetResult(result);
    }

    public void Failed(Exception ex)
    {
        _tcs.SetException(ex);
    }

    public override string ToString()
    {
        return InputData.ToString();
    }
}

Creating the action block executed in the pipeline

    private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction()
    {
        return new Action<WorkItem<Location,PointToLocation>>(async i => {
            var sw = new Stopwatch();
            sw.Start();

            var taskItem = ((WorkItem<Location,PointToLocation>)i);
            var inputData = taskItem.InputData;

            //Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}");

            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(500);
            sw.Stop();
            Location outData = new Location()
            {
                Latitude = inputData.Latitude,
                Longitude = inputData.Longitude,
                StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}"
            };
            taskItem.Complete(outData);
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
        });

    }

Test Method You'll need to provide your own implementation for PointToLocation and Location. Just an example of how you'd use it with your own classes.

    int startRange = 0;
    int nextRange = 1000;
    ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc;
    private void cmdTestPipeline_Click(object sender, EventArgs e)
    {
        Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}");

        if(tpc == null)
        {
            tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
                //1010, 2, 20000,
                TimeSpan.FromMilliseconds(1010), 45, 100000,
                CreateProcessingAction(),
                2,45,10);
        }

        var workItems = new List<WorkItem<Models.Location, PointToLocation>>();
        foreach (var i in Enumerable.Range(startRange, nextRange))
        {
            var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 };
            var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc);
            workItems.Add(wrkItem);


            wrkItem.Task.ContinueWith(t =>
            {
                var loc = t.Result;
                string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}";
                //txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine);
                //var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine},
                //    StringSplitOptions.RemoveEmptyEntries).LongCount();

                //lblLines.Text = lines.ToString();
                //Log(line);

            });
            //}, TaskScheduler.FromCurrentSynchronizationContext());

        }

        startRange += nextRange;

        tpc.EnqueueItemsAsync(workItems);

        Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}");
    }
Polynomial Proton
  • 5,020
  • 20
  • 37
Nicholas
  • 572
  • 6
  • 17