6

(NOTE: I'm using .Net 4, not .Net 4.5, so I cannot use the TPL's DataflowBlock classes.)

TL;DR Version

Ultimately, I'm just looking for a way to process sequential work items using multiple threads in a way that preserves their order in the final output, without requiring an unbounded output buffer.

Motivation

I have existing code to provide a multithreaded mechanism for processing multiple blocks of data where one I/O-bound thread (the "supplier") is reponsible for enqueuing blocks of data for processing. These blocks of data comprise the work items.

One or more threads (the "processors") are responsible for dequeuing one work item at a time, which they process and then write the processed data to an output queue before dequeuing their next work item.

A final I/O-bound thread (the "consumer") is responsible for dequeuing completed work items from the output queue and writing them to the final destination. These work items are (and must be) written in the same order that they were enqueued. I implemented this using a concurrent priority queue, where the priority of each item is defined by its source index.

I'm using this scheme to do some custom compression on a large data stream, where the compression itself is relatively slow but the reading of the uncompressed data and the writing of the compressed data is relatively fast (although I/O-bound).

I process the data in fairly large chunks of the order of 64K, so the overhead of the pipeline is relatively small.

My current solution is working well but it involves a lot of custom code written 6 years ago using many synchronisation events, and the design seems somewhat clunky; therefore I have embarked on academic excercise to see if it can be rewritten using more modern .Net libraries.

The new design

My new design uses the BlockingCollection<> class, and is based somewhat on this Microsoft article.

In particular, look at the section entitled Load Balancing Using Multiple Producers. I have tried using that approach, and therefore I have several processing tasks each of which takes work items from a shared input BlockingCollection and writes its completed items to its own BlockingCollection output queue.

Because each processing task has its own output queue, I'm trying to use BlockingCollection.TakeFromAny() to dequeue the first available completed work item.

The Multiplexer problem

So far so good, but now here comes the problem. The Microsoft article states:

The gaps are a problem. The next stage of the pipeline, the Display Image stage, needs to show images in order and without gaps in the sequence. This is where the multiplexer comes in. Using the TakeFromAny method, the multiplexer waits for input from both of the filter stage producer queues. When an image arrives, the multiplexer looks to see if the image's sequence number is the next in the expected sequence. If it is, the multiplexer passes it to the Display Image stage. If the image is not the next in the sequence, the multiplexer holds the value in an internal look-ahead buffer and repeats the take operation for the input queue that does not have a look-ahead value. This algorithm allows the multiplexer to put together the inputs from the incoming producer queues in a way that ensures sequential order without sorting the values.

Ok, so what happens is that the processing tasks can produce finished items in pretty much any order. The multiplexer is responsible for outputting these items in the correct order.

However...

Imagine that we have 1000 items to process. Further imagine that for some weird reason, the very first item takes longer to process that all the other items combined.

Using my current scheme, the multiplexer will keep reading and buffering items from all the processing output queues until it finds the next one that it's supposed to output. Since the item that its waiting for is (according to my "imagine if" above) only going to appear after ALL the other work items have been processed, I will effectively be buffering all the work items in the entire input!

The amount of data is way too large to allow this to happen. I need to be able to stop the processing tasks from outputting completed work items when the output queue has reached a certain maximum size (i.e. it's a bounded output queue) UNLESS the work item happens to be the one the multiplexer is waiting for.

And that's where I'm getting a bit stuck. I can think of many ways to actually implement this, but they all seem to be over-complex to the extent that they are no better than the code I'm thinking to replace!

What's my question?

My question is: Am I going about this the right way?

I would have thought this would be a well-understood problem, but my research has only turned up articles that seem to ignore the unbounded buffering problem that occurs if a work item takes a very long time compared to all the other work items.

Can anyone point me at any articles that describe a reasonable way to achieve this?

TL;DR Version

Ultimately, I'm just looking for a way to process sequential work items using multiple threads in a way that preserves their order in the final output, without requiring an unbounded output buffer.

svick
  • 236,525
  • 50
  • 385
  • 514
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276

4 Answers4

2

Create a pool of items at startup, 1000, say. Store them on a BlockingCollection - a 'pool queue'.

The supplier gets items from the pool queue, loads them from the file, loads in the sequence-number/whatever and submits them to the processors threadpool.

The processors do their stuff and sends the output to the multiplexer. The multiplexer does it job of storing any out-of-order items until earlier items have been processed.

When an item has been completely consumed by whatever the multiplexer outputs to, they are returned to the pool queue for re-use by the supplier.

If one 'slow item' does require enormous amounts of processing, the out-of-order collection in the multiplexer will grow as the 'quick items' slip through on the other pool threads, but because the multiplexer is not actually feeding its items to its output, the pool queue is not being replenished.

When the pool empties, the supplier will block on it and will be unable to supply any more items.

The 'quick items' remaining on the processing pool input will get processed and then processing will stop except for the 'slow item'. The supplier is blocked, the multiplexer has [poolSize-1] items in its collection. No extra memory is being used, no CPU is being wasted, the only thing happening is the processing of the 'slow item'.

When the 'slow item' is finally done, it gets output to the multiplexer.

The multiplexer can now output all [poolSize] items in the required sequential order. As these items are consumed, the pool gets filled up again and the supplier, now able to get items from the pool, runs on, again reading its file an queueing up items to the processor pool.

Auto-regulating, no bounded buffers required, no memory runaway.

Edit: I meant 'no bounded buffers required' :)

Also, no GC holdups - since the items are re-used, they don't need GC'ing.

Martin James
  • 24,453
  • 3
  • 36
  • 60
  • That sounds like an interesting solution. I shall investigate! :) – Matthew Watson Feb 22 '13 at 12:59
  • I'm marking this as the solution because I used a modified version of the idea about a "pool queue". I'll post my code in the next day or two (when I have time). I'm still not completely happy about my multiplexor code - it still seems somewhat over complicated. Hopefully I may get some constructive criticism about it when I post it. ;) – Matthew Watson Feb 24 '13 at 14:21
1

I think you misunderstand the article. According to the description, it doesn't have an unbounded buffer, there will be at most one value in the look-ahread buffer for each queue. When you dequeue a value that's not the next one, you save it and then wait only on the queue that doesn't have a value in the buffer. (If you have multiple input buffers, the logic will have to be more complicated, or you would need a tree of 2 queue multiplexers.)

If you combine this with BlockingCollections that have specified bounded capacity, you get exactly the behavior you want: if one producer is too slow, the others will pause until the slow thread catches up.

svick
  • 236,525
  • 50
  • 385
  • 514
  • The queues going into the multiplexer are indeed bounded. But the fiddlyness I'm finding is that if you have more than two queues going into the multiplexer you can't use the `TakeFromAny()` method as described in the Microsoft article. Well, you can to get the first item, but if that item isn't in sequence you need to start peeking into the queues until you find the next item that you want. With two queues, it's easy, but with more than two my solutions became unwieldy. Perhaps the tree of 2 queue multiplexers would work, but again it doesn't sound better than what I already have! – Matthew Watson Feb 22 '13 at 12:53
  • I'll investigate the tree idea further. – Matthew Watson Feb 22 '13 at 12:59
  • @MatthewWatson You *can* use `TakeFromAny()`, you just need to use it on the right subset of queues. – svick Feb 22 '13 at 12:59
  • I see what you mean - I can keep reducing the set of queues until I receive the work item I want. In some cases in the end I might only be waiting on one queue, but in most cases I'll receive the item I want before then. I'll have a go at this tomorrow. :) – Matthew Watson Feb 22 '13 at 13:11
1

Have you considered not using manual producer/consumer buffering but instead the .AsParallel().AsOrdered() PLINQ alternative? Semantically, this is exactly what you want - a sequence of items processed in parallel but ordered in output. Your code could look as simple as...

var orderedOutput = 
    ReadSequentialBlocks()
    .AsParallel()
    .AsOrdered()
    .Select(ProcessBlock)
foreach(var item in orderedOutput)
    Sink(item);

The default degree of parallelism is the number of processors on your machine, but you can tune it. There is an automatic output buffer. If the default buffering consumes too many resources, you can turn it off:

.WithMergeOptions(ParallelMergeOptions.NotBuffered)

However, I'd certainly give the plain unadorned version a shot first - you never know, it might just work fine out of the box. Finally, if you want the simplicity of auto-multiplexing but a larger-than-zero yet non-automatic buffer, you could always use the PLINQ query to fill a fixed-size BlockingCollection<> which is read with a consuming enumerable on another thread.

Eamon Nerbonne
  • 47,023
  • 20
  • 101
  • 166
  • If you use `NotBuffered` that doesn't turn off the output buffer, it just means that items from the output buffer will be returned as soon as possible, not in chunks (which is what PLINQ normally does). But I think this way there is still the unbounded output buffer, so this doesn't actually do what the question asks. – svick Feb 22 '13 at 15:09
  • That's not how MSDN describes it: http://msdn.microsoft.com/en-us/library/dd997424.aspx. MSDN says it's analogous to streaming, and that it may take longer in total (which is what you'd expect if there no buffer). Of course, _effectively_ there's always a small buffer in the form of in-flight items, and probably also in the cross-thread handoff, but for the purposes of memory usage this isn't relevant. – Eamon Nerbonne Feb 22 '13 at 16:35
  • Note that there might still be an input buffer - but this approach is so much simpler than manually coding, it's almost certainly worth a shot. If indeed then benchmarking reveals that there is a memory usage issue, a more complex approach might be worth it. – Eamon Nerbonne Feb 22 '13 at 16:39
  • The whole question is about the memory issue, so if an answer doesn't address that, I think it's not really an answer. – svick Feb 22 '13 at 16:41
  • And I don't see anything in the documentation that contradicts what I said. Streaming output doesn't mean the processing of the input is limited. Though it seems there is some limit: if one item takes too long, the other threads pause their work after producing ~8k items each. – svick Feb 22 '13 at 16:48
  • The NonBuffered thing _does_ address the memory issue. Since the level of abstraction is higher one can speculate that there is nevertheless some buffer that's problematic; but it's probably the input buffer; and that could probably be addressed by a partitioner. – Eamon Nerbonne Feb 22 '13 at 16:56
  • In any case: he should _try_ it; it might satisfy his needs despite the less fine-grained control (and you're definitely right you don't quite have as much control), and if so it's the much simpler solution. – Eamon Nerbonne Feb 22 '13 at 16:57
  • Ok, so I wrote a quick linqpad mockup to see what happens that processes 1000 000 64k blocks with some uninteresting number crunching; it took about 4 minutes to run; there were never more than 473 blocks in flight (tracked via Interlocked), and the working set hovered around 160MB more than linqpad baseline. **In short: despite crunching 64GB of numbers, memory usage was low.** – Eamon Nerbonne Feb 22 '13 at 17:06
1

Follow up

For completeness, here is the code that I wound up with. Thanks to Martin James for his answer, which provided the basis for the solution.

I'm still not completely happy with the multiplexor (see ParallelWorkProcessor.multiplex()). It works, but it seems a bit klunky.

I used Martin James' idea about a work pool to prevent unbounded growth of the multiplexor buffer, however I substituted a SemaphoreSlim for the work pool queue (since it provides the same functionality, but it's a bit simpler to use and uses less resources).

The worker tasks write their completed items to a concurrent priority queue. This allows me to easily and efficiently find the next item to output.

I used a sample concurrent priority queue from Microsoft, modified to provide an autoreset event that's signalled whenever a new item is enqueued.

Here's the ParallelWorkProcessor class. You use it by providing it with three delegates; one to provide the work items, one to process a work item, and one to output a completed work item.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
    {
        public delegate T    Read();           // Called by only one thread.
        public delegate T    Process(T block); // Called simultaneously by multiple threads.
        public delegate void Write(T block);   // Called by only one thread.

        public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
        {
            _read    = read;
            _process = process;
            _write   = write;

            numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;

            _workPool    = new SemaphoreSlim(numWorkers*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _workers     = new Task[numWorkers];

            startWorkers();
            Task.Factory.StartNew(enqueueWorkItems);
            _multiplexor = Task.Factory.StartNew(multiplex);
        }

        private void startWorkers()
        {
            for (int i = 0; i < _workers.Length; ++i)
            {
                _workers[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void enqueueWorkItems()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null) // Signals end of input.
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                    break;
                }

                _workPool.Wait();
                _inputQueue.Add(new WorkItem(data, index++));
            }
        }

        private void multiplex()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (index != last)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.

                while ((index != last) && _outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index); // This *must* be the case.
                        _workPool.Release();                    // Allow the enqueuer to queue another work item.
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _multiplexor.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem
        {
            public WorkItem(T data, int index)
            {
                Data  = data;
                Index = index;
            }

            public T   Data  { get; private set; }
            public int Index { get; private set; }
        }

        private readonly Task[] _workers;
        private readonly Task _multiplexor;
        private readonly SemaphoreSlim _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

And here's my test code:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.

            var stopwatch = new Stopwatch();

            _numBlocks = _maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • can you please post WaitForNewItem method code which you customised in ConcurrentPriorityQueue ? – Kishan Gajjar Oct 28 '15 at 16:31
  • @KishanGajjar Unfortunately the code is at work, so I'll have to wait until tomorrow morning to post it - sorry! – Matthew Watson Oct 28 '15 at 18:01
  • Can you provide the method today? – Kishan Gajjar Oct 29 '15 at 07:45
  • @KishanGajjar Unfortunately it seems that we don't have that code any more - we ended up using a different approach using the TPL. However you can find the original unmodified ConcurrentPriorityQueue [here](https://github.com/slashdotdash/ParallelExtensionsExtras/blob/master/src/CoordinationDataStructures/ConcurrentPriorityQueue.cs). You just need to add an AutoResetEvent field that you signal when `Enqueue()` is called, and add a method `WaitForNewItem()` which returns when the event is signalled. – Matthew Watson Oct 29 '15 at 09:41