3

I have a set of huge taks to be performed in c#. Each calcuation will produce a resultant data which i want to write into a file (i am using SQLite). Currently i am doing this in a sequential way like this [Task1 -> FileSaving1], [Task2 -> FileSaving2],. and so on.

But my priority is to complete all the calculations first, so i want to run the calculation in parallel in one thread and the file saving to be done in another. FileSaving thread will be signalled everytime when the calculation is over and the data is ready to be written. FileSaving can be sequential or Parallel.

How to acheive this in C#? I am using .Net 4.0. Please provide me with some example if possible.

Murugan
  • 1,441
  • 1
  • 12
  • 22

1 Answers1

2

You can use a BlockingCollection<T> to help with this.

The tricky thing is that you want several threads processing work items, but they can produce their output in a random order so you need to multiplex the output when writing it (assuming you want to write the data in the same order as it would have been written if you used the old single-threaded solution).

I wrote a class to do this a while back.

It assumes that you can encapsulate each "work item" in an instance of a class. Those instances are added to a work queue; then multiple threads (via Task) can remove work items from the work queue, process them, and then output them to a priority queue.

Finally, another thread can remove the completed work items from the completed queue, being careful to multiplex them so that it removes the items in the same order as they were originally added to the work queue.

This implementation creates and manages the threads for you. You need to tell it how many worker threads to use, and supply it delegates to provide new work items (Read()), process each work item (Process()) and output each work item (Write()).

Only the Process() delegate is called by multiple threads.

Note that if you don't care about the order, you can avoid all this stuff and pretty much use BlockingCollection directly.

Here's the code:

public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
{
    public delegate T    Read();           // Called by only one thread.
    public delegate T    Process(T block); // Called simultaneously by multiple threads.
    public delegate void Write(T block);   // Called by only one thread.

    public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
    {
        _read    = read;
        _process = process;
        _write   = write;

        numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;

        _workPool    = new SemaphoreSlim(numWorkers*2);
        _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
        _outputQueue = new ConcurrentPriorityQueue<int, T>();
        _workers     = new Task[numWorkers];

        startWorkers();
        Task.Factory.StartNew(enqueueWorkItems);
        _multiplexor = Task.Factory.StartNew(multiplex);
    }

    private void startWorkers()
    {
        for (int i = 0; i < _workers.Length; ++i)
        {
            _workers[i] = Task.Factory.StartNew(processBlocks);
        }
    }

    private void enqueueWorkItems()
    {
        int index = 0;

        while (true)
        {
            T data = _read();

            if (data == null) // Signals end of input.
            {
                _inputQueue.CompleteAdding();
                _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                break;
            }

            _workPool.Wait();
            _inputQueue.Add(new WorkItem(data, index++));
        }
    }

    private void multiplex()
    {
        int index = 0; // Next required index.
        int last = int.MaxValue;

        while (index != last)
        {
            KeyValuePair<int, T> workItem;
            _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.

            while ((index != last) && _outputQueue.TryPeek(out workItem))
            {
                if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                {
                    last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                }
                else if (workItem.Key == index) // Is this block the next one that we want?
                {
                    // Even if new items are added to the queue while we're here, the new items will be lower priority.
                    // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                    _outputQueue.TryDequeue(out workItem);
                    Contract.Assume(workItem.Key == index); // This *must* be the case.
                    _workPool.Release();                    // Allow the enqueuer to queue another work item.
                    _write(workItem.Value);
                    ++index;
                }
                else // If it's not the block we want, we know we'll get a new item at some point.
                {
                    _outputQueue.WaitForNewItem();
                }
            }
        }
    }

    private void processBlocks()
    {
        foreach (var block in _inputQueue.GetConsumingEnumerable())
        {
            var processedData = _process(block.Data);
            _outputQueue.Enqueue(block.Index, processedData);
        }
    }

    public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
    {
        return _multiplexor.Wait(maxMillisecondsToWait);
    }

    private sealed class WorkItem
    {
        public WorkItem(T data, int index)
        {
            Data  = data;
            Index = index;
        }

        public T   Data  { get; private set; }
        public int Index { get; private set; }
    }

    private readonly Task[] _workers;
    private readonly Task _multiplexor;
    private readonly SemaphoreSlim _workPool;
    private readonly BlockingCollection<WorkItem> _inputQueue;
    private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
    private readonly Read    _read;
    private readonly Process _process;
    private readonly Write   _write;
}

Here's the test code for it:

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.

            _numBlocks = _maxBlocks;
            var stopwatch = Stopwatch.StartNew();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}

This also requires a ConcurrentPriorityQueue implementation from here.

I had to modify that slightly, so here's my modified version:

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;


namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 

    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 

    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 

        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 

        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]

        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }

        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 

        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 

        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 

        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 

        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 

        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 

        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 

        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 

        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 

        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 

            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 

            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 

            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 

            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 

                // If the new item is the only item, we're done. 
                if (pos == 0) return; 

                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 

                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 

                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 

                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 

            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 

            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 

                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 

                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 

                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 

                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 

                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 

                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 

                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 

                // Return the item from the heap 
                return toReturn; 
            } 

            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 

            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }

        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}
Community
  • 1
  • 1
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276