0

I am writing a Windows Service. I have a 'backlog' (in SQL) of records that have to be processed by the service. The backlog might also be empty. The record processing is a potentially very long running operation (3+ minutes).

I have a class and method in it which would go to the SQL, choose a record and process it, if there are any records to process. Then the method will exist and that's it. Note: I can't know in advance which records will be processed - the class method decides this as part of its logic.

I want to achieve parallel processing. I want to have X number of workers (where X is the optimal for the host PC) at any time. While the backlog is empty, those workers finish their jobs and exit pretty quickly (~50-100ms, maybe). I want any 'freed' worker to start over again (i.e. re-run).

I have done some reading and I deduct that ThreadPool is not a good option for long-running operations. The .net 4.0+ parallel library is not a good option either, as I don't want to wait all workers to finish and I don't want to predefine/declare in advance the tasks.

In layman terms I want to have X workers who query the data source for items and when some of them find such - operate on it, the rest would continue to look for newly pushed items into the backlog.

What would be the best approach? I think I will have to manage the threads entirely by myself? i.e. first step - determine the optimum number of threads (perhaps by checking the Environment.ProcessorCount) and then start the X threads. Monitor for IsAlive on each thread and restart it? This seems awfully unprofessional.

Any suggestions?

hyankov
  • 4,049
  • 1
  • 29
  • 46
  • 1
    You could implement a producer/consumer pattern based on a `BlockingCollection`. One thread monitors the database and enqueues items, then n threads dequeue the items to process them (`foreach (var item in blockingCollection.GetConsumingEnumerable())`) – Kevin Gosse Jun 13 '16 at 15:14
  • Dis you check the [`Task`](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task(v=vs.110).aspx?f=255&MSPPError=-2147217396) class, you don't have to `await` them once launched, but you can still track them. Benefit ? The Framework will manage the optimal number of Tasks that can run simultaneously – Irwene Jun 13 '16 at 15:16
  • 1
    The threadpool manager already does this, it is pretty unclear why you want to help. Maybe you haven't tried it? If you did and you don't observe 100% processor usage then that's almost always because the expensive work is done by the dbase engine. Pummeling it with more requests just delays the requests. Or begets you a ticked-off admin that will throttle you. – Hans Passant Jun 13 '16 at 15:25
  • I have multiple sources on SF saying that the ThreadPool should not be used for long-running operations. @KooKiz, not an option - the task iteself decides which operation it wants to process from the backlog. – hyankov Jun 13 '16 at 15:27

3 Answers3

0

You can start one task per core,As tasks finish start new ones.You can use numOfThreads depending on ProcessorCount or specific number

int numOfThreads = System.Environment.ProcessorCount;
// int numOfThreads = X;
for(int i =0; i< numOfThreads; i++)
task.Add(Task.Factory.StartNew(()=> {});
while(task.count>0) //wait for task to finish
{
      int index = Task.WaitAny(tasks.ToArray());
      tasks.RemoveAt(index);
      if(incomplete work)
      task.Add(Task.Factory.StartNew()=> {....});
}

or

var options = new ParallelOptions();
options.MaxDegreeOfParllelism = System.Environment.ProcessorCount;
Parallel.For(0,N,options, (i) => {/*long running computattion*/};

or

You can Implement Producer-Coustomer pattern with BlockingCollection

This topic is excellently taught by Dr.Joe Hummel on his Pluralsight course "Async and parallel programming: Application design "

MustangManiac
  • 317
  • 2
  • 13
Krtti
  • 62
  • 10
-1

Consider using ActionBlock<T> from TPL.DataFlow library. It can be configured to process concurrently multiple messages using all available CPU cores.

ActionBlock<QueueItem> _processor;
Task _completionTask;
bool _done;

async Task ReadQueueAsync(int pollingInterval)
{
     while (!_done)
     { 
         // Get a list of items to process from SQL database
        var list = ...;

        // Schedule the work 
        foreach(var item in list) 
        {
          _processor.Post(item);
        }

       // Give SQL server time to re-fill the queue
       await Task.Delay(pollingInterval);
     }

     // Signal the processor that we are done
     _processor.Complete();

}

void ProcessItem(QueueItem item)
{
      // Do your work here
} 

void Setup()
{
      // Configure action block to process items concurrently
      //  using all available CPU cores
     _processor= new ActionBlock<QueueItem>(new Action<QueueItem>(ProcessItem), 
         new    ExecutionDataFlowBlock{MaxDegreeOfParallelism =  DataFlowBlockOptions.Unbounded});
    _done = false;

    var queueReaderTask = ReadQueueAsync(QUEUE_POLL_INTERVAL);
    _completionTask = Task.WhenAll(queueReaderTask, _processor.Completion);   
}


void Complete()
{
    _done = true;
    _completionTask.Wait();
}
alexm
  • 6,854
  • 20
  • 24
  • Thanks for your post. However: I can't get all records to process from the db, they could be thousands. Only one record is retrieved at a time, based on a query within the actual processing logic. – hyankov Jun 13 '16 at 17:52
  • All those records a to be scheduled first, for simplicity a queueItem can contain a record id, and retrieve the details from inside ProcessItem(). A queue can easily contain tens of thousands of int's.. it is not that hard.. – alexm Jun 13 '16 at 19:05
-1

Per MaxDegreeOfParallelism's documentation: "Generally, you do not need to modify this setting. However, you may choose to set it explicitly in advanced usage scenarios such as these:

  1. When you know that a particular algorithm you're using won't scale beyond a certain number of cores. You can set the property to avoid wasting cycles on additional cores.

  2. When you're running multiple algorithms concurrently and want to manually define how much of the system each algorithm can utilize. You can set a MaxDegreeOfParallelism value for each.

  3. When the thread pool's heuristics is unable to determine the right number of threads to use and could end up injecting too many threads. For example, in long-running loop body iterations, the thread pool might not be able to tell the difference between reasonable progress or livelock or deadlock, and might not be able to reclaim threads that were added to improve performance. In this case, you can set the property to ensure that you don't use more than a reasonable number of threads."

If you do not have an advanced usage scenario like the 3 cases above, you may want to hand your list of items or tasks to be run to the Task Parallel Library and let the framework handle the processor count.

List<InfoObject> infoList = GetInfo();
ConcurrentQueue<ResultObject> output = new ConcurrentQueue<ResultObject>();

await Task.Run(() =>
{
    Parallel.Foreach<InfoObject>(infoList, (item) =>
    {
        ResultObject result = ProcessInfo(item);
        output.Add(result);
    });
});

foreach(var resultObj in output)
{
    ReportOnResultObject(resultObj);
}

OR

List<InfoObject> infoList = GetInfo();
List<Task<ResultObject>> tasks = new List<Task<ResultObject>>();

foreach (var item in infoList)
{
    tasks.Add(Task.Run(() => ProcessInfo(item)));
}

var results = await Task.WhenAll(tasks);

foreach(var resultObj in results)
{
    ReportOnResultObject(resultObj);
}

H/T to IAmTimCorey tutorials:

https://www.youtube.com/watch?v=2moh18sh5p4

https://www.youtube.com/watch?v=ZTKGRJy5P2M

jaybro
  • 1,363
  • 1
  • 12
  • 23
  • The `List` class [is not thread safe](https://stackoverflow.com/questions/5020486/listt-thread-safety). – Theodor Zoulias Jul 28 '21 at 17:03
  • @TheodorZoulias good point, thread safety addressed in first example. However, the overall point still stands, why not let the framework handle the processor count? – jaybro Jul 29 '21 at 20:05
  • The `ConcurrentBag` is probably the worse concurrent collection for this job. It works, but it doesn't preserve the insertion order. The `ConcurrentQueue` [is much preferable](https://stackoverflow.com/questions/15400133/when-to-use-blockingcollection-and-when-concurrentbag-instead-of-listt/64823123#64823123). As for why not letting the framework handle the processor count, it's because [it does a lousy job at handling it](https://stackoverflow.com/a/66263583/11178549). Always configure the `MaxDegreeOfParallelism` when using the `Parallel.ForEach`. – Theodor Zoulias Jul 29 '21 at 20:13