4

Currently, I process frames read from a video one by one, then write them to file. This seems inefficient and slow, so I'd like to split the work across multiple threads.

My current code can be summarized like this:

for(long n = 0; n < totalframes; n++) {
    using(Bitmap frame = vreader.ReadVideoFrame()) {
        Process(frame); //awfully slow
        WriteToFile(frame);
    }
}

How can I load, say, four frames, process them in four threads, wait for all of them to finish, then write them to file? It's critical that the frames are written in the exact same order as they were in the video.

Pacane
  • 20,273
  • 18
  • 60
  • 97
Peter W.
  • 2,323
  • 4
  • 22
  • 42
  • 2
    Parallel writing to files can end up being slower than storing them sequentially, depending on your storage device. – Fabian Bigler Jun 02 '13 at 11:32
  • 1
    @FabianBigler: I suspect it's the processing that's slow, not the writing. – Jon Skeet Jun 02 '13 at 11:33
  • I should probably clarify: I'm processing the frames before writing them to a GIF. That's why the order is so important. – Peter W. Jun 02 '13 at 11:35
  • There are 2 things you can do: 1) Make it so that the writeToFile function has a parameter the accepts the frame number 2) Use Parallel for the process the frame and when all are processed write them to a file – Svexo Jun 02 '13 at 11:47
  • Not sure about Parallel.ForEach() - the input is a buffer stream, and the output from each thread should be write immediately if no earlier data is outstanding. – Martin James Jun 02 '13 at 11:58
  • The Bitmap class explicitly prevents more than one thread from accessing its pixel data. Only one thread can call LockBits(). So there very first thing you'll need to do is create *extra* bitmaps that are a copy of the original video frame. A deep copy, Clone() isn't good enough. Once the processing is done, you'll need to glue the pieces back together. Clearly there's significant overhead involved in this, it also rather depends whether the processing algorithm easily permits this kind of subdivision. Being ahead with threads is *not* a slamdunk. – Hans Passant Jun 02 '13 at 14:36
  • There was an answer about a pipeline with example code here a few minutes ago and now it's gone. WTF? – Peter W. Jun 02 '13 at 14:57

7 Answers7

5

You can process the frames with for example a Parallel.ForEach(). And then read them with an iterator block (IEnumerable<>).

But the writing needs a little more attention. Make sure you attach a number to each frame and at the end of processing, dump them in a BlockingCollection<T> . Start a separate thread (Task) to process the queue and write the frames in order. This is a classic n-Producer / 1-Consumer solution.

user703016
  • 37,307
  • 8
  • 87
  • 112
H H
  • 263,252
  • 30
  • 330
  • 514
  • Yeah, something like that - not sure if it needs another thread. The pool thread that enters the 'reserialization cache' and finds that it can write its own data because all earlier frames have been written, and maybe cached frames from 'earlier' threads, can do the writing. The 'ReSerializer' would surely need to be a thread-safe collection. – Martin James Jun 02 '13 at 11:55
  • Possible but that would burden the worker threads with I/O at both ends. My suggestion already includes the reading (which could also be done by a separate single Task). Attaching an uneven Write at the end may upset the ForEach scheduler and or partioner. – H H Jun 02 '13 at 11:59
  • I can't really use ForEach here because the source is not an IEnumerable, but a single Bitmap. [AForge.NET's VideoFileReader](http://www.aforgenet.com/framework/docs/html/c77bc424-9fff-a248-7ab8-eed59929cfa5.htm) only has a .ReadVideoFrame() method which returns the next frame. – Peter W. Jun 02 '13 at 12:06
  • You can write an Iterator around a GetNext() method. I don't see the problem. A `Parallel.For()` would be harder to re-sequence. – H H Jun 02 '13 at 12:13
  • Ah, sorry, brain fart. Still, where do I store the frames and frame numbers until processing is finished? I can't just put them in a list, there could be too many frames for that. And I'd rather not blindly trust the user to _not_ try to create a GIF from a 3 minute long video. :D – Peter W. Jun 02 '13 at 12:20
  • That's where the Consumer comes in. It keeps the next-frame-nr-to-write and it only needs to delay frames that arrive too early, that should be <= nr-of-threads – H H Jun 02 '13 at 12:31
  • 1
    Why worry about order? Can't you just use `AsOrdered` to make sure they end up written in the same order they were read? Maybe `ReadFrames().AsParallel().AsOrdered().Select(ProcessFrame).Select(WriteFrame)` or something like that? – Gabe Jun 02 '13 at 15:42
  • @Gabe - that's worth posting as a separate answer. – H H Jun 02 '13 at 15:56
3

This is where you want a Pipeline. I have pretty much directly copied the code from Patterns of Parallel Programming, and introduced extra parallelism in step 2 (I have included examples using both parallel tasks and PLINQ). It's not too complicated, it works, and on my box it runs many times faster than the sequential version. You may not see the same degree of improvement in your code (because I'm guessing that your Process is a bit more involved than Thread.Sleep), but it will still run quicker.

Obviously, there is a lot of clutter due to additional parallellism and me trying to match your object model. Refer to page 55 of Patterns of Parallel Programming for the original, no-fuss sample code. It's a thing of beauty, so be sure to check it out (http://www.microsoft.com/en-au/download/details.aspx?id=19222).

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineExample
{
    /// <summary>
    /// Stack Overflow question 16882318.
    /// </summary>
    public class Program
    {
        /// <summary>
        /// This is our simulated "file". In essense it will contain the
        /// ID of each Frame which has been processed and written to file.
        /// </summary> 
        private static readonly List<long> FrameFile = new List<long>();

        /// <summary>
        /// This is a modification of Stephen Toub's Pipelines
        /// example from Patterns Of Parallel Programming.
        /// </summary>
        private static void RunPipeline(VReader vreader, long totalframes)
        {
            var rawFrames = new BlockingCollection<Bitmap>();
            var processedFrames = new BlockingCollection<Bitmap>();

            // Stage 1: read raw frames.
            var readTask = Task.Run(() =>
            {
                try
                {
                    for (long n = 0; n < totalframes; n++)
                    {
                        rawFrames.Add(vreader.ReadVideoFrame());
                    }
                }
                finally { rawFrames.CompleteAdding(); }
            });

            // Stage 2: process frames in parallel.
            var processTask = Task.Run(() =>
            {
                try
                {
                    // Try both - see which performs better in your scenario.
                    Step2WithParallelTasks(rawFrames, processedFrames);
                    //Step2WithPLinq(rawFrames, processedFrames);
                }
                finally { processedFrames.CompleteAdding(); }
            });

            // Stage 3: write results to file and dispose of the frame.
            var writeTask = Task.Run(() =>
            {
                foreach (var processedFrame in processedFrames.GetConsumingEnumerable())
                {
                    WriteToFile(processedFrame);
                    processedFrame.Dispose();
                }
            });

            Task.WaitAll(readTask, processTask, writeTask);
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithPLinq(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via PLinq.");

            var processed = rawFrames.GetConsumingEnumerable()
                .AsParallel()
                .AsOrdered()
                .Select(frame =>
                {
                    Process(frame);
                    return frame;
                });

            foreach (var frame in processed)
            {
                processedFrames.Add(frame);
            }
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithParallelTasks(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via parallel tasks.");

            var degreesOfParallellism = Environment.ProcessorCount;
            var inbox = rawFrames.GetConsumingEnumerable();

            // Start our parallel tasks.
            while (true)
            {
                var tasks = inbox
                    .Take(degreesOfParallellism)
                    .Select(frame => Task.Run(() =>
                    {
                        Process(frame);
                        return frame;
                    }))
                    .ToArray();

                if (tasks.Length == 0)
                {
                    break;
                }

                Task.WaitAll(tasks);

                foreach (var t in tasks)
                {
                    processedFrames.Add(t.Result);
                }
            }
        }

        /// <summary>
        /// Sequential implementation - as is (for comparison).
        /// </summary>
        private static void RunSequential(VReader vreader, long totalframes)
        {
            for (long n = 0; n < totalframes; n++)
            {
                using (var frame = vreader.ReadVideoFrame())
                {
                    Process(frame);
                    WriteToFile(frame);
                }
            }
        }

        /// <summary>
        /// Main entry point.
        /// </summary>
        private static void Main(string[] args)
        {
            // Arguments.
            long totalframes = 1000;
            var vreader = new VReader();

            // We'll time our run.
            var sw = Stopwatch.StartNew();

            // Try both for comparison.
            //RunSequential(vreader, totalframes);
            RunPipeline(vreader, totalframes);

            sw.Stop();

            Console.WriteLine("Elapsed ms: {0}.", sw.ElapsedMilliseconds);

            // Validation: count, order and contents.
            if (Range(1, totalframes).SequenceEqual(FrameFile))
            {
                Console.WriteLine("Frame count and order of frames in the file are CORRECT.");
            }
            else
            {
                Console.WriteLine("Frame count and order of frames in the file are INCORRECT.");
            }

            Console.ReadLine();
        }

        /// <summary>
        /// Simulate CPU work.
        /// </summary>
        private static void Process(Bitmap frame)
        {
            Thread.Sleep(10);
        }

        /// <summary>
        /// Simulate IO pressure.
        /// </summary>
        private static void WriteToFile(Bitmap frame)
        {
            Thread.Sleep(5);
            FrameFile.Add(frame.ID);
        }

        /// <summary>
        /// Naive implementation of Enumerable.Range(int, int) for long.
        /// </summary>
        private static IEnumerable<long> Range(long start, long count)
        {
            for (long i = start; i < start + count; i++)
            {
                yield return i;
            }
        }

        private class VReader
        {
            public Bitmap ReadVideoFrame()
            {
                return new Bitmap();
            }
        }

        private class Bitmap : IDisposable
        {
            private static int MaxID;
            public readonly long ID;

            public Bitmap()
            {
                this.ID = Interlocked.Increment(ref MaxID);
            }

            public void Dispose()
            {
                // Dummy method.
            }
        }
    }
}
Kirill Shlenskiy
  • 9,367
  • 27
  • 39
2

To operate on elements in parallel, use System.Linq's parallel methods, like ParallelEnumerable.Range(). To keep the elements in order, you can use .AsOrdered().

ParallelEnumerable.Range(0, totalframes)
                  .AsOrdered()
                  .Select(x => vreader.ReadVideoFrame())
                  .Select(Process)
                  .Select(WriteToFile);
Gabe
  • 84,912
  • 12
  • 139
  • 238
  • ParallelEnumerable.Range is an `(int, int)` method. The original question used `long`. Also your Select(Process) line implies that Process returns a Bitmap, which may or may not be the case. Otherwise this is an excellent answer. – Kirill Shlenskiy Jun 03 '13 at 00:33
  • @Kirill: I can't imagine what sort of video file could have more frames than fit into an `int`. At 60fps, 2^31 fames is more and a year of video! – Gabe Jun 03 '13 at 04:55
  • Fair point. I think I'll change my answer as well and get rid of that `long` ugliness. – Kirill Shlenskiy Jun 03 '13 at 06:38
  • It is elegant, but I'm not sure how exactly this will be executed. – H H Jun 06 '13 at 19:08
  • @Henk: The theory is that there is a buffer between each stage. The frames will be read out and buffered, passed to the processing stage in order, buffered there, and then passed to the writing stage in order. – Gabe Jun 06 '13 at 21:21
0

Yeah - you need a threadpool, some threads, a class for the input image data + a 'sequence number' or 'frame number' to identify the order and a thread-safe 'ReSerializer' class that has a container to cache all frames received 'out of order' until earlier frames come in.

Martin James
  • 24,453
  • 3
  • 36
  • 60
0

Perhaps 4 BackgroundWorker's. Pass a number from 1-4 to each besides the data itself - and in their RunWorkerCompleted event handler - check if all other 3 have finished... (You can use a bool[4] for that.)

As far as I know - you don't have to worry about 2 RunWorkerCompleted's being called at the same time, because they all run on the same thread.

ispiro
  • 26,556
  • 38
  • 136
  • 291
0

I had a similar problem that I asked about in this thread.

I did come up with a solution which seems to work ok, but it might seem far too complicated for your purposes.

It revolves around you being able to supply 3 delegates: One to retrieve a work item (in your case, it will return a Bitmap), one to process that work item and a final one to output that work item. It also allows you to specify the maximum number of concurrent threads that will be running - you could use this to limit memory usage. See the numTasks parameter in the ParallelBlockProcessor constructor below.

Only the processing delegate is called by multiple threads.

Like you, I needed to ensure that the final output was written in the same order as the original input. I used a priority queue for this.

There may be better solutions using .Net 4.5's TPL, but I was limited to .Net 4.

Here's the code I came up with - I think you could adapt it to your problem:

The ParallelBlockProcessor class:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using ConsoleApplication1;

namespace Demo
{
    public sealed class ParallelBlockProcessor<T> where T: class
    {
        public delegate T Read();            // Called by only one thread.
        public delegate T Process(T block);  // Called simultaneously by multiple threads.
        public delegate void Write(T block); // Called by only one thread.

        public ParallelBlockProcessor(Read read, Process process, Write write, int numTasks = 0)
        {
            Contract.Requires(read != null);
            Contract.Requires(process != null);
            Contract.Requires(write != null);
            Contract.Requires((0 <= numTasks) && (numTasks <= 64));

            _read    = read;
            _process = process;
            _write   = write;

            numTasks = (numTasks > 0) ? numTasks : Environment.ProcessorCount;

            _workPool   = new BlockingCollection<WorkItem>(numTasks*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numTasks);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _processors  = new Task[numTasks];

            initWorkItems();
            startProcessors();
            Task.Factory.StartNew(enqueueBlocks);
            _dequeuer = Task.Factory.StartNew(dequeueBlocks);
        }

        private void startProcessors()
        {
            for (int i = 0; i < _processors.Length; ++i)
            {
                _processors[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void initWorkItems()
        {
            for (int i = 0; i < _workPool.BoundedCapacity; ++i)
            {
                _workPool.Add(new WorkItem());
            }
        }

        private void enqueueBlocks()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null)
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special terminator WorkItem.
                    break;
                }

                WorkItem workItem = _workPool.Take();
                workItem.Data = data;
                workItem.Index = index++;

                _inputQueue.Add(workItem);
            }
        }

        private void dequeueBlocks()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (true)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem();   // There will always be at least one item - the sentinel item.

                while (_outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index);
                        _workPool.Add(new WorkItem()); // Free up a work pool item.     
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }

                    if (index == last)
                    {
                        return;
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _dequeuer.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem // Note: This is mutable.
        {
            public T   Data  { get; set; }
            public int Index { get; set; }
        }

        private readonly Task[] _processors;

        private readonly Task _dequeuer;

        private readonly BlockingCollection<WorkItem> _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;

        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

The Priority Queue (adapted from Microsoft's one):

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 

    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 

    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 

        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 

        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]

        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }

        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 

        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 

        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 

        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 

        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 

        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 

        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 

        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 

        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 

        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 

            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 

            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 

            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 

            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 

                // If the new item is the only item, we're done. 
                if (pos == 0) return; 

                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 

                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 

                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 

                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 

            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 

            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 

                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 

                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 

                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 

                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 

                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 

                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 

                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 

                // Return the item from the heap 
                return toReturn; 
            } 

            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 

            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }

        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}

The test program:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            int maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4);  // Kludge!

            var stopwatch = new Stopwatch();

            _numBlocks = maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelBlockProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)_numBlocks;
            Console.WriteLine("Supplied input: " + _numBlocks);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 190)/*!*/
            {
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
    }
}
Community
  • 1
  • 1
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
0

You could use the CountdownEVent, which lets you wait on more than one thread.

example: static CountdownEvent _countdown = new CountdownEvent (3);

static void Main()
{
  new Thread (SaySomething).Start ("I am thread 1");
  new Thread (SaySomething).Start ("I am thread 2");
  new Thread (SaySomething).Start ("I am thread 3");

  _countdown.Wait();   // Blocks until Signal has been called 3 times
  Console.WriteLine ("All threads have finished speaking!");
}

static void SaySomething (object thing)
{
  Thread.Sleep (1000);
  Console.WriteLine (thing);
  _countdown.Signal();
}

This Code does not gurantee that threads 1-3 will execute in that order, However if you call the signal method first, i believe that should solve it

Another more Effecient approach would be to look to implement a Monitor.Pulse() and Monitor.Wait() mechanism, you could use this in junction with Thread.Sleep in theory to place a thread to sleep when it has completed executing a critical section, in your case a frame. after one thread has finished proccessing the frame put that thread to sleep and pulse the waiting thread do this continuously untill all frames are finished and then awake the threads.... Threads are tricky since they are difficult to know when they will finish executing.

KING
  • 938
  • 8
  • 26