3

I have the following Scenario.

  1. I take 50 jobs from the database into a blocking collection.

  2. Each job is a long running one. (potentially could be). So I want to run them in a separate thread. (I know - it may be better to run them as Task.WhenAll and let the TPL figure it out - but I want to control how many runs simultaneously)

  3. Say I want to run 5 of them simultaneously (configurable)

  4. I create 5 tasks (TPL), one for each job and run them in parallel.

What I want to do is to pick up the next Job in the blocking collection as soon as one of the jobs from step 4 is complete and keep going until all 50 are done.

I am thinking of creating a Static blockingCollection and a TaskCompletionSource which will be invoked when a job is complete and then it can call the consumer again to pick one job at a time from the queue. I would also like to call async/await on each job - but that's on top of this - not sure if that has an impact on the approach.

Is this the right way to accomplish what I'm trying to do?

Similar to this link, but catch is that I want to process the next Job as soon as one of the first N items are done. Not after all N are done.

Update :

Ok, I have this code snippet doing exactly what I want, if someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is done with current. Only 5 threads are active at any given time. I understand this may not work 100% like this always, and will have performance issues of context switching if used with one cpu/core.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

Job 2 started on thread :13. wait time:3600000ms. Time:8/29/2014 3:14:43 PM

Job 4 started on thread :14. wait time:15000ms. Time:8/29/2014 3:14:43 PM

Job 0 started on thread :7. wait time:600000ms. Time:8/29/2014 3:14:43 PM

Job 1 started on thread :12. wait time:900000ms. Time:8/29/2014 3:14:43 PM

Job 3 started on thread :11. wait time:120000ms. Time:8/29/2014 3:14:43 PM

job 4 finished on thread :14. 8/29/2014 3:14:58 PM

Job 5 started on thread :14. wait time:1800000ms. Time:8/29/2014 3:14:58 PM

job 3 finished on thread :11. 8/29/2014 3:16:43 PM

Job 6 started on thread :11. wait time:1200000ms. Time:8/29/2014 3:16:43 PM

job 0 finished on thread :7. 8/29/2014 3:24:43 PM

Job 7 started on thread :7. wait time:30000ms. Time:8/29/2014 3:24:43 PM

job 7 finished on thread :7. 8/29/2014 3:25:13 PM

Job 8 started on thread :7. wait time:100000ms. Time:8/29/2014 3:25:13 PM

job 8 finished on thread :7. 8/29/2014 3:26:53 PM

Job 9 started on thread :7. wait time:900000ms. Time:8/29/2014 3:26:53 PM

job 1 finished on thread :12. 8/29/2014 3:29:43 PM

Job 10 started on thread :12. wait time:300000ms. Time:8/29/2014 3:29:43 PM

job 10 finished on thread :12. 8/29/2014 3:34:43 PM

Job 11 started on thread :12. wait time:600000ms. Time:8/29/2014 3:34:43 PM

job 6 finished on thread :11. 8/29/2014 3:36:43 PM

Job 12 started on thread :11. wait time:300000ms. Time:8/29/2014 3:36:43 PM

job 12 finished on thread :11. 8/29/2014 3:41:43 PM

Job 13 started on thread :11. wait time:100000ms. Time:8/29/2014 3:41:43 PM

job 9 finished on thread :7. 8/29/2014 3:41:53 PM

Job 14 started on thread :7. wait time:300000ms. Time:8/29/2014 3:41:53 PM

job 13 finished on thread :11. 8/29/2014 3:43:23 PM

job 11 finished on thread :12. 8/29/2014 3:44:43 PM

job 5 finished on thread :14. 8/29/2014 3:44:58 PM

job 14 finished on thread :7. 8/29/2014 3:46:53 PM

job 2 finished on thread :13. 8/29/2014 4:14:43 PM

Community
  • 1
  • 1
Alex J
  • 1,547
  • 2
  • 26
  • 41
  • Regarding your update: My suggestion shouldn't have issues on a single core machine, because TPL can optimize and choose a lower degree of parallelism than the max (5) to reduce context switches. – i3arnon Aug 29 '14 at 21:07
  • Another note: I used `block.Post(item)` for a reason. Using `await block.SendAsync(item)` is redundant when you don't set a BoundedCapcity on the `ActionBlock` and it (very slightly) hurts performance. – i3arnon Aug 29 '14 at 21:10
  • yes, but if you note in my code sample, i'm no longer using async job => await job.ProcessAsync() and I figured then using block.SendAsync might help ? – Alex J Aug 29 '14 at 21:26
  • It doesn't. The consumer and producer are unrelated async-wise. You can have all 4 options of async/sync producer/consumer. You just said you want to use async-await so I went with in the example. – i3arnon Aug 29 '14 at 21:29

3 Answers3

5

You can easily achieve what you need using TPL Dataflow.

What you can do is use BufferBlock<T>, which is a buffer for storing you data, and link it together with an ActionBlock<T> which will consume those requests as they're coming in from the BufferBlock<T>.

Now, the beauty here is that you can specify how many requests you want the ActionBlock<T> to handle concurrently using the ExecutionDataflowBlockOptions class.

Here's a simplified console version, which processes a bunch of numbers as they're coming in, prints their name and Thread.ManagedThreadID:

private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

You can also post them asynchronously if needed, using the awaitable BufferBlock.SendAsync

That way, you let the TPL handle all the throttling for you without needing to do it manually.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • 1
    The `BufferBlock` is redundant here. – i3arnon Aug 29 '14 at 10:37
  • @l3arnon How else would you publish the data coming in? – Yuval Itzchakov Aug 29 '14 at 10:42
  • @l3arnon - So you delegate the work to be processed by the actual object? What would happen if he decides the flow needs multiple receivers? – Yuval Itzchakov Aug 29 '14 at 11:08
  • It doesn't matter, it's just for the example. The processing code can sit in the block. If by receivers you mean after the actionblock then there's no problem there. If you mean different actionblocks then that's a very different question. – i3arnon Aug 29 '14 at 11:15
  • What is the difference here using BufferBlock.SendAsync or actually calling job.ProcessAsync as in arnon's example? Lets say each job gets data from the db for 5 to 15 minutes, how will be the behavior in each method? does it wait for ANY of 5 to complete before starting on the next, or since the initial 5 is waiting on async, does it queue up the next 5. I really want only 5 running at a time. – Alex J Aug 29 '14 at 14:40
  • It will wait untill the `async` method being awaited will complete. – Yuval Itzchakov Aug 29 '14 at 14:56
  • ok, thank you for all your input. One last question. Does this ensure each job gets its own thread (assume the machine has 5 cores for example). I believe if there is only one core/processor, it will try to still get 5 threads if possible. I was going to put a longrunning option to force a thread, but that's not available for dataflow options. – Alex J Aug 29 '14 at 15:40
  • 1
    Why do you need to ensure each gets its own thread? Running this on a single core will simply cause context switching which will effect performance. I suggest you let `TPL` balance the thread usage. The `MaxDegreeOfParallalism` ensures it wont go beyond those max threads, it doesn't guarantee it will use that number to process the requests – Yuval Itzchakov Aug 29 '14 at 16:39
  • I have a requirement to process N jobs simultaneously where N is number of cores. If I force a thread, it is guaranteed. TPL can choose to only pick 3 threads for example though max says 5. Also I read somewhere that if job is a long running operation, TPL may have other jobs wait sometimes. – Alex J Aug 29 '14 at 17:28
  • You can specify `LongRunning` if you absolutely must. I would test performance with and without – Yuval Itzchakov Aug 29 '14 at 18:58
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/60265/discussion-between-alex-j-and-yuval-itzchakov). – Alex J Aug 29 '14 at 21:16
3

You can use BlockingCollection and it will work just fine, but it was built before async-await so it blocks synchronously which could be less scalable in most cases.

You're better off using async ready TPL Dataflow as Yuval Itzchakov suggested. All you need is an ActionBlock that processes each item concurrently with a MaxDegreeOfParallelism of 5 and you post your work to it synchronously (block.Post(item)) or asynchronously (await block.SendAsync(item)):

private static void Main()
{
    var block = new ActionBlock<Job>(
        async job => await job.ProcessAsync(),
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

    for (var i = 0; i < 50; i++)
    {
        block.Post(new Job());
    }

    Console.ReadKey();
}
i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • ok, this is a concise sample and I think this will work for me as I don't need multiple receivers. Question. So if each Job is getting some data from the db which could take say from 5 to 15 minutes, and I do job.ProcessAsync , does it mean the action block will start processing the first 5, wait on getting data from db on all of them, and then queue up the next 5, OR wait until ANY of the 5 actually completes and then kick off the next 1 and keep going 1 at a time?. – Alex J Aug 29 '14 at 14:30
  • ok, thank you. Also if I introduce LongRunning option to ExecuteDataFlowBlockOptions, it won't make any difference other than starting these on separate threads, correct? Some can be really long running (like an hour) and I have heard TPL sometimes may wait other jobs if I don't do that. – Alex J Aug 29 '14 at 14:43
  • @AlexJ LongRunning is an option when you run tasks yourself. `TPL Dataflow` doesn't need that because it handles the scheduling itself. You just give it a maximum (i.e. `MaxDegreeOfParallelism`) that it can reach, but it can also choose a lower degree when appropriate. – i3arnon Aug 29 '14 at 21:02
0

You could do this with a SemaphoreSlim like in this answer, or using ForEachAsync like in this answer.

Community
  • 1
  • 1
NeddySpaghetti
  • 13,187
  • 5
  • 32
  • 61