0

I have a number of producer tasks that push data into a BlockingCollection, lets call it requestQueue.

I also have a consumer task that pops requests from the requestQueue, and forwards async http requests to a remote web service.

I need to throttle or block the number of active requests sent to the web service. On some machines that are far away from the service or have a slower internet connection, the http response time is long enough that the number of active requests fills up more memory than I'd like.

At the moment I am using a semaphore approach, calling WaitOne on the consumer thread multiple times, and Release on the HTTP response callback. Is there a more elegant solution?

I am bound to .net 4.0, and would like a standard library based solution.

Alexander Pope
  • 1,134
  • 1
  • 12
  • 22
  • Given that "I am bound to .net 4.0, and would like a standard library based solution." your solution using semaphore is good. One suggestion is to use `SemaphoreSlim` rather than `Semaphore` for performance. – Sriram Sakthivel Jan 12 '16 at 19:07
  • 1
    You should check out [TPL Dataflow](https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx). You can get the download link for the .NET 4.0 version from http://stackoverflow.com/a/15405517/607701. – David Peden Jan 12 '16 at 19:34
  • I am familiar with the library, but I cannot use it in this project(don't ask why :( ). If I had 4.5 and freedom of choice, I wouldn't have posted the question. – Alexander Pope Jan 12 '16 at 19:46

1 Answers1

1

You are already using a BlockingCollection why have a WaitHandle?

The way I would do it is to have a BlockingCollection with n as it's bounded capacity where n is the maximum number of concurrent requests you want to have at any given time.

You can then do something like....

var n = 4;
var blockingQueue = new BlockingCollection<Request>(n);

Action<Request> consumer = request => 
{
    // do something with request.
};

var noOfWorkers = 4;
var workers = new Task[noOfWorkers];

for (int i = 0; i < noOfWorkers; i++)
{
    var task = new Task(() =>
    {
        foreach (var item in blockingQueue.GetConsumingEnumerable())
        {
            consumer(item);
        }
    }, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach);
    workers[i] = task;
    workers[i].Start();
}

Task.WaitAll(workers);

I let you take care of cancellation and error handling but using this you can also control how many workers you want to have at any given time, if the workers are busy sending and processing the request any other producer will be blocked until more room is available in the queue.

  • The items in the BlockingCollection are not active requests. They become active after they are popped from the queue and forwarded to the web service (basically your consumer(item);). The upper bound on the blocking collection doesn't help here, since the consumer is fast, but the requests waiting for a response clog the memory. – Alexander Pope Jan 12 '16 at 19:31
  • In that case just add another one of these and use it as a pipeline. –  Jan 12 '16 at 19:33
  • Side note: use `Task.Run` instead of using the `Task` constructor and calling `Task.Start`. – Yacoub Massad Jan 12 '16 at 19:39
  • It would not be a pretty solution. Suppose I add another queue and push the active requests to it. The responses are not guaranteed to arrive in order, so popping the queue on the callback would result in mismatched reponse/request pairs. Your proposal is similar to my semaphore in effect, but with uglier semantics. – Alexander Pope Jan 12 '16 at 19:43
  • @AlexanderPope that was not in your requirement, perhaps next time you should make your question more clear. –  Jan 12 '16 at 19:46
  • I was describing your proposed solution in the previous comment, not another requirement. Sorry for not being clear. – Alexander Pope Jan 12 '16 at 19:50