0

I want to build on this question.

So far I have got a way to process jobs in parallel. I'm running this in a console app. I get say 50 jobs from db, process them using TPL DataFlow and so far so good. But I realized that if there is a job that takes an hour to process and rest of the jobs get done in 15 minutes, the console application will keep going for an hour without processing any further jobs. I can't change this to a windows service so I have to make the console application process new jobs coming in, may be check every 15 minutes.

I could kick off a timer that checks for new jobs every 15 minutes. If there is any new job in the db, I will need to add to the buffer block so actionblock can process it. The problem is that after you add the first 50 jobs, you have to call complete and completion.wait for buffer and action blocks. So I can't add anymore new jobs to the existing buffer.

I could check the current actionblock's isCompleted property and then create another combination of buffer/actionblock dynamically. Basically the condition is if the current actionblock is still going, check for new jobs on a timer and create a new buffer/actionblock combination. This is what I plan on doing. But before I go down that path, is there another approach I can take to handle this?

Community
  • 1
  • 1
Alex J
  • 1,547
  • 2
  • 26
  • 41

1 Answers1

1

If I understand correctly and all you want is a constant "flow" of jobs to be executed concurrently, but have no more than 50 jobs waiting to be executed you can use the same ActionBlock with a BoundedCapacity and add to it when you can:

private static Task ProcessJobsAsync(CancellationToken cancellationToken)
{
    var block = new ActionBlock<Job>(
        job => job.Process(),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount, // Or any other value that fits
            BoundedCapacity = 50,
        });
    cancellationToken.Register(block.Complete);
    var producer = Task.Run(async () =>
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            foreach (var job in await GetJobsAsync())
            {
                await block.SendAsync(job,cancellationToken);
            }
        }
    });

    return Task.WhenAll(producer, block.Completion);
}

If the block is slow, and reaches it's capacity wait block.SendAsync(job,cancellationToken); will asynchronously wait until a space is cleared in the queue for another job. That way you always have jobs to be executed. When you want to close the app (or cancel the operation) you can signal that using a CancellationToken

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • I think I may not have been 100% clear. The console job runs on a timer every 5 minutes. The idea is to process new jobs as they are available for a maximum of 50 jobs per run. Sometimes it will only be 10 jobs taking less than 5 minutes to complete, sometimes it could be 30, some taking 1 hour. So if one job takes an hour, the console app will run for an hour (the next console instance will wait until the current is done). But if only one job takes an hour and rest complete within 15 minutes, I want to keep checking for new jobs within the same console job instance - which is why a timer. – Alex J Sep 02 '14 at 18:12
  • it is not a constant flow of jobs that need to be executed. The console job ends after processing 50 (or how many ever available) and wakes up again on a frequency to check. – Alex J Sep 02 '14 at 18:14
  • 1
    @AlexJ So you are running your app every 15 minutes? why not just run it once and it will make sure to always get the new items to process? – i3arnon Sep 02 '14 at 18:25
  • I think then it needs to be converted to a service, which I can't do right now. http://stackoverflow.com/questions/695877/c-sharp-console-application-keep-it-running – Alex J Sep 02 '14 at 18:29
  • @AlexJ That is a very weird design. Let's assume you are left with 1 job that takes an hour, and so you add more, but then you add another job that takes an hour, and so forth... What do you want to do then? – i3arnon Sep 02 '14 at 18:39
  • Yes, I understand. Unfortunately this is an existing app and I cannot change it drastically right now. It is ok if the console app has to run say for 4 hours (jobs won't always be that long). New instances won't be kicked off until this is done. So it is almost like a service that ends and starts. I assume even in a service, there needs to be a timer to check for new jobs? – Alex J Sep 02 '14 at 18:46
  • @AlexJ Not necessarily. In my example you just wait (asynchronously ) until you can actually process more jobs, and only then do you go and get more. – i3arnon Sep 02 '14 at 18:49
  • In your example, what is the event that gets invoked when I can go look for more jobs? It would be nice to have an event when a free space opens up, and I can keep adding until all existing jobs complete and there are no more new ones available. – Alex J Sep 02 '14 at 19:00
  • @AlexJ The while loop always tries to add more, but I set a BoundedCapcity of 50 so when that limit is reached, the next call to `SendAsync` will "block" (asynchronously) and the foreach would continue only when there's space in the block. – i3arnon Sep 02 '14 at 19:03