4

I have a situation in which I have a producer/consumer scenario. The producer never stops, which means that even if there is a time where there are no items in the BC, further items can be added later.

Moving from .NET Framework 3.5 to 4.0, I decided to use a BlockingCollection as a concurrent queue between the consumer and the producer. I even added some parallel extensions so I could use the BC with a Parallel.ForEach.

The problem is that, in the consumer thread, I need to have a kind of an hybrid model:

  1. Im always checking the BC to process any item that arrived with a Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  2. Inside this foreach, I execute all the tasks that dont depend between each other.
  3. Here comes the problem. After paralelizing the previous tasks I need to manage their results in the same FIFO order in which they were in the BC. The processing of these results should be made in a sync thread.

A little example in pseudo code follows:

producer:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

consumer:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Parallel ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}
            

Obviously, this cant work as it is because the .GetConsumingEnumerable() blocks until there is another item in the BC. I asume I could do it with tasks and just fire 4 or 5 task in a same batch, but:

  1. How could I do this with tasks and still have a waiting point before the start of the tasks that blocks until there is an item to be consumed in the BC (I don't want to start processing if there is nothing. Once there is something in the BC i would just start the batch of 4 tasks, and use a TryTake inside each one so if there is nothing to take they don't block, because I don't know if I can always reach the number of items from the BC as the batch of tasks, for example, just one item left in the BC and a batch of 4 tasks) ?
  2. How could I do this and take advantage of the efficiency that Parallel.For offers?
  3. How could I save the results of the tasks in the same FIFO order in which the items were extracted from the BC?
  4. Is there any other concurrency class more suited to this kind of hybrid processing of items in the consumer?
  5. Also, this is my first question ever made in StackOverflow, so if you need any more data or you just think that my question is not correct just let me know.
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Marcelman
  • 43
  • 5
  • you producer snippet is missing a paren and it also doesn't make any sense. – T McKeown May 20 '15 at 20:00
  • A few things: (1) use a `CancelationToken` and not a boolean flag to do cancellation. (2) given that unless cancelled this continues for ever, when would you process a sequence of results? (3) what are these results? (4) you will need to have your producer determine a sequence number associated with the `ScannedPage` on enqueue, or alternatively `ScannedPage` instances must have a sequence number. – Alex May 20 '15 at 20:14
  • Hi Alex. In response to what you were mentioning. 1) Thanks! 2) What alternative do I have so it does not continues forever but instead process the items in batches of n elements, every time it detects that the BC is not empty? 3) I receive some scanned pages in order, and in the parallel section i execute independent processes in each image (read a barcode, rotate it,). After this part I need to save the images in the order they came (hence the FIFO requirement) because the path to save an image could be dependant of the image before that. 4) each item already has the sequence number. – Marcelman May 20 '15 at 20:36

2 Answers2

2

I think I follow what you're asking, why not create a ConcurrentBag and add to it while processing like this:

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

This obviously doesn't enqueue them in the exact order they were added to the BC but you could add some sequence nbr to the ScannedPage object like Alex suggested and then sort the results after.

Here's how I'd handle the sequence:

Add this to the ScannedPage class:

public static int _counter;  //public because this is just an example but it would work.

Get a sequence nbr and assign here:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}
T McKeown
  • 12,971
  • 1
  • 25
  • 32
  • Thanks for your quick answer. I have a couple of questions about your answer: 1) Would this guarantee me the FIFO order (I mean, will it enqueue it in the order of the foreach)? 2) Won't there be a problem because I am altering the item (in the processing part) after I added to the queue? – Marcelman May 20 '15 at 20:25
  • because you are Parallelizng the for loop the order is not guaranteed, you'd need some token on the object to indicate a sequence. I will edit an alternative thread safe way assuming the ScannedPage had a `SeqNbr` property that you set earlier to indicate it's order. – T McKeown May 20 '15 at 20:29
  • Why exactly would a Bag be better than a BlockingCollection (Queue)? – H H May 20 '15 at 21:06
  • no different other than BC is meant for Producer/Consumer and all I need is a simple thread safe list. It would work with a BC if you choose. – T McKeown May 20 '15 at 21:08
  • Hi T, thanks!!!!!, I will do that. Now for the other part of the question. What alternative do I have so it does not continue forever (inside the Parallel.ForEach, or block when there is no element) but instead process the items in batches of n elements, every time it detects that the BC is not empty? is there some kind of Peek() method in BC that blocks until there is an element in it? – Marcelman May 20 '15 at 21:13
  • Look at `TryTake()` with a -1 timeout will wait forever or until cancelled. https://msdn.microsoft.com/en-us/library/dd287247%28v=vs.110%29.aspx – T McKeown May 20 '15 at 21:56
0

Whenever you need the results of a parallel operation, using PLINQ is generally more convenient that using the Parallel class. Here is how you could refactor your code using PLINQ:

private void Init()
{
    _cancelTasks = new CancellationTokenSource();
    _checkTask = Task.Run(() =>
    {
        while (true)
        {
            _cancelTasks.Token.ThrowIfCancellationRequested();

            var bc = _concurrentCollection;
            var partitioner = Partitioner.Create(
                bc.GetConsumingEnumerable(_cancelTasks.Token),
                EnumerablePartitionerOptions.NoBuffering);

            ScannedPage[] results = partitioner
                .AsParallel()
                .AsOrdered()
                .Select(scannedPage =>
                {
                    // Process the scannedPage
                    return scannedPage;
                })
                .ToArray();

            // Process the results
        }
    });
}

The .AsOrdered() is what ensures that you'll get the results in the same order as the input.

Be aware that when you consume a BlockingCollection<T> with the Parallel class or PLINQ, it is important to use the Partitioner and the EnumerablePartitionerOptions.NoBuffering configuration, otherwise there is a risk of deadlocks. The default greedy behavior of the Parallel/PLINQ and the blocking behavior of the BlockingCollection<T>, do not interact well.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104