0

I am trying to implement the following requirements (C# 4.0);

  • One "producer" (on the UI thread - driven by user action) which submits upload requests
  • A ListView control which is databound to the collection of pending upload requests
  • One consumer which process the requests (FIFO), and keeps the UI up-to-date as items status' update, or the request is fulfilled.

So far, I haven't been able to work out how to do the above without using two collections, as follows;

public void AddUploadRequest(UploadRequest uploadRequest)
{
    this.UploadRequests.Add(uploadRequest);
    this.uploadRequestsBlocking.Add(uploadRequest);
}

... where UploadRequests is an ObservableCollection<UploadRequest> and uploadRequestsBlocking is a BlockingCollection<UploadRequest>

The full code can be found here; http://pastebin.com/620EqaY5 (ignore the gross injection of Dispatcher - this is just prototype code so far)

I notify my UI of updates as follows;

this.dispatcher.Invoke(() => this.UploadRequests.Remove(uploadRequest));

Is there a better way to achieve this functionality? Or, more importantly, are there any serious drawbacks to this approach?

One possible extension I was considering was to have multiple consumers, by using Parallel.ForEach over GetConsumingPartitioner (based on this example). Would anything about the current approach make this unsuitable? It does work OK, but I'm not 100% confident that I haven't committed some major threading faux-pas somewhere along the way.

Chris McAtackney
  • 5,192
  • 8
  • 45
  • 69

2 Answers2

2

The main (only) reason that you can't just databind to the BlockingCollection is that it does not implement INotifyCollectionChanged, so the UI would not get updated when a request is added / removed. That said, there are BlockingCollection wrappers around on the net that implement INotifyCollectionChanged. Or you could instead expose a CollectionView for the BlockingCollection and instead of dispatching a remove from an ObservableCollection, just dispatch a Refresh of the CollectionView.

    public ICollectionView UploadRequestsView {get;set;}

    public UploadRequester(Dispatcher dispatcher)
    {
        this.dispatcher = dispatcher;           
        this.uploadRequestsBlocking = new BlockingCollection<UploadRequest>();

        UploadRequestsView = CollectionViewSource.GetDefaultView(uploadRequestsBlocking);

        this.consumerTask = Task.Factory.StartNew(this.ConsumeUploadRequests);
    }

    public void AddUploadRequest(UploadRequest uploadRequest)
    {
        uploadRequestsBlocking.Add(uploadRequest);
        UploadRequestsView.Refresh()
    }

    private void ConsumeUploadRequests()
    {
        foreach (var uploadRequest in this.uploadRequestsBlocking.GetConsumingEnumerable())
        {
            uploadRequest.Status = "Uploading...";

            Thread.Sleep(2000);
            uploadRequest.Status = "Successfully uploaded";

            Thread.Sleep(500);
            dispatcher.Invoke(() => UploadRequestsView.Refresh());
        }
    }
Community
  • 1
  • 1
Andrew Hanlon
  • 7,271
  • 4
  • 33
  • 53
  • Am I correct in saying that I would need to call UploadRequestsView.Refresh() inside the "AddUploadRequest" call (and so use the dispatcher on every addition)? – Chris McAtackney Apr 07 '14 at 15:46
  • Thanks Andrew - this approach seems to work well for my use case. – Chris McAtackney Apr 07 '14 at 16:30
  • I'm glad this helps - One thing to keep in mind is that if you have a large number of items and additions / removals happening quickly, this can hang the UI more readily than a Collection Change notification since the whole list is refreshed rather than a single item. One way to avoid this is to throttle the refresh to only occur every x milliseconds when needed. – Andrew Hanlon Apr 07 '14 at 17:18
1

I would use TPL and no need for injecting the Dispatcher since TaskScheduler.FromCurrentSynchronizationContext() will give me the current Dispatcher of the MainUI thread.

public class UploadRequestProcessor
{
        public ObservableCollection<UploadRequest> UploadRequests { get; private set; }
        private readonly BlockingCollection<UploadRequest> uploadRequestsBlocking;
        private Task consumerTask;

        public UploadRequester()
        {
                this.UploadRequests = new ObservableCollection<UploadRequest>();
                this.uploadRequestsBlocking = new BlockingCollection<UploadRequest>();
                this.consumerTask = Task.Factory.StartNew(this.ConsumeUploadRequests).ContinueWith(nextTask => RemoveUploadRequests(nextTask.Result as BlockingCollection<UploadRequest>), TaskScheduler.FromCurrentSynchronizationContext());
        }

        public void AddUploadRequest(UploadRequest uploadRequest)
        {
                this.UploadRequests.Add(uploadRequest);
                this.uploadRequestsBlocking.Add(uploadRequest);
        }

        private void ConsumeUploadRequests()
        {
            var requestsToRemove = new BlockingCollection<UploadRequest>();
                foreach (var uploadRequest in this.uploadRequestsBlocking.GetConsumingEnumerable())
                {
                        uploadRequest.Status = "Uploading...";

                        Thread.Sleep(2000);
                        uploadRequest.Status = "Successfully uploaded";

                        Thread.Sleep(500);
                        requestsToRemove.Add(uploadRequest);

                }
            return requestsToRemove;
        }

        private void RemoveUploadRequests(BlockingCollection<UploadRequest> requestsToRemove)
        {
            foreach(var request in requestsToRemove)
            {
                UploadRequests.Remove(request);
            }
        }
}
123 456 789 0
  • 10,565
  • 4
  • 43
  • 72
  • The problem here is that I never actually mark my BlockingCollection as complete, so "requestsToRemove" is never returned. Thanks for the SynchronizationContext tip though - will look at incorporating that. – Chris McAtackney Apr 07 '14 at 16:29