0

Note: This is a follow on question from a previous one I asked here.

Just to summarise quickly, my previous problem was with how to databind a BlockingCollection to a control in WPF - which was solved by using the CollectionViewSource.

However, I have thought about my use case a bit more and realised that simply using a BlockingCollection isn't going to work for me. I want the following behaviour;

  • One source of "work items", submitted to a common pool
  • Multiple "processors" of these work items
  • Items which are still "pending" and those which are being "processed" should both show up in the same view for databinding.

For example;

8 work items are submitted simultaneously, and the max level of concurrency is 4. Four of the work items should be moved into the "Processing" state, while the other four remain in "Pending". As each item in the "Processing" state completes, another item from the "Pending" state is picked up for processing. Once an item is finished processing, it is removed from the pool of work items. This is all visible to the user in real time.

The problem I had with the previous approach was that as an item was picked up for processing, it would disappear from the view because it had been "consumed" by the call to GetConsumingEnumerable. What I really wanted was for items to be safely picked out of the "pending" pool for processing, but still remain in the view so that status updates (via INotifyPropertyChanged) could be visible in the UI.

I have addressed the problem of items disappearing from the view by actually using two concurrent collections instead, then wrapping them up as a single CompositeCollection (which I bind to instead of using the ICollectionView)

I have implemented this behaviour as below;

this.currentWorkItems = new ObservableConcurrentCollection<WorkItem>();
this.pendingWorkItems = new ObservableConcurrentCollection<WorkItem>();

this.compositeCollection = new CompositeCollection
{
    new CollectionContainer { Collection = this.currentWorkItems},
    new CollectionContainer { Collection = this.pendingWorkItems },
};

for (int i = 0; i < workConcurrencyFactor; i++)
{
   Task.Factory.StartNew(this.ProcessWorkItems);
}

Then my Add method;

public void Add(WorkItem workItem)
{
    this.pendingWorkItems.TryAdd(workItem);
}

Finally, the ProcessWorkItems method;

private void ProcessWorkItems()
{
    while (true)
    {
        Thread.Sleep(100);

        WorkItem workItem;
        if (this.pendingWorkItems.TryTake(out workItem))
        {
            this.currentWorkItems.TryAdd(workItem);

            workItem.Status = "Simulating First Step";                
            Thread.Sleep(1000);

            workItem.Status = "Simulating Second Step";
            Thread.Sleep(1000);

            // Finished processing
            this.currentWorkItems.TryTake(out workItem);
        }
    }
}

Note, I'm using ObservableConcurrentCollection from here.

This works OK, but I feel like I'm missing something here, or that I might be incurring totally unnecessary overhead by having multiple tasks sleeping and waking constantly when nothing else is really happening. Also, I feel like I'm abusing the second ObservableConcurrentCollection somewhat, by essentially just using it as a holding area for items that I'm working on, but that I still want to be visible.

Is there a better approach to this problem? What is the standard pattern for concurrent consumers to process a collection "in place", whilst avoiding multiple consumers grabbing the same item?

Community
  • 1
  • 1
Chris McAtackney
  • 5,192
  • 8
  • 45
  • 69
  • 1
    It sounds like a problem in scope of [TPL Dataflow](http://msdn.microsoft.com/en-us/library/hh228603%28v=vs.110%29.aspx) domain. If I were you I'd look into it to see if it helps. The dataflow pipeline would do the processing, while all items would just sit in the bound collection to see the updates (?). – Patryk Ćwiek Apr 18 '14 at 10:06

1 Answers1

1

As Patryk already suggested this is a good example for TPL Dataflow - we do something similar (just with several steps in the pipeline including batching and transforming) here:

Create your Dataflow block to process the tasks and a collection to hold all of them:

var actionBlock = new ActionBlock<WorkItem>(item => ProcessWorkItem(item), 
   new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = threadCount });
var allItems = new ConcurrentDictionary<int, WorkItem>(); // or whatever fits

Then in the the Add method:

public void Add(WorkItem workItem)
{
    allItems.Add(workItem.Id, workItem);
    actionBlock.Post(workItem);
}

And at the end of ProcessWorkItem do a allItems.Remove(workItem.Id).

P.S.: The dataflow blocks are pretty fast also - we do several hundred Post calls per second here...

Christoph Fink
  • 22,727
  • 9
  • 68
  • 113
  • Nice example, cheers. I've implemented a prototype using this and it has seriously cut down on the amount of code in my view model. Plus it seems very fast - great stuff. – Chris McAtackney Apr 18 '14 at 11:39
  • Just following up.. I forgot to mention that my app is targeting .NET4.0 and I have no scope to upgrade to 4.5. You can imagine my reaction when I realised there is no stable release of DataFlow for 4.0 :-) I went back to the original idea of two concurrent dictionaries, but with the added extra of a ConcurrentQueue of "ticket numbers", which is used to enforce queue-like processing of the dictionaries. Performance is still very good, but maybe 25% slower than the DataFlow implementation - which is tolerable for my use case. Thanks again for the suggestion, I will use DF again some day! – Chris McAtackney Apr 19 '14 at 13:00
  • 1
    Maybe this helps: http://stackoverflow.com/questions/15338907/where-can-i-find-a-tpl-dataflow-version-for-4-0 – Christoph Fink Apr 19 '14 at 13:24
  • I did look into that, but the only version of the assembly that supports .NET 4.0 is pretty old - and is marked as "Pre-release" in the installer. I'm just going to pass on it for now, but thanks for the suggestion. I ended up rewriting my solution again to use BlockingCollection but with an ObservableConcurrentQueue as its backing store. Mix in the other ideas about using a CompositeCollection and it works very nicely. – Chris McAtackney Apr 24 '14 at 12:12