2

I'm limited to .NET 3.5 and I'm using TPL. The scenario is producer-consumer but there is no problem of blocking. PLINQ cannot be used in this scenario (because of limitations) and what we want to achieve is the fastest way to produce many items (where each production is a long-running one, and the number of items exceeds 100,000) but each item must be consumed in a FIFO order (which means, the first item I asked to be produced must be consumed first, even if it was created after other items) and also consumed as fast as possible.

For this problem I tried using a task list, wait for the first item in the list to be completed (taskList.First().IsCompleted()) and then using the consuming function on it, but for some reason I seem to run out of memory (maybe too many items in the task list because of tasks waiting to start?) Is there any better way to do that? (I'm trying to achieve the fastest possible)

Many thanks!

Mano
  • 73
  • 1
  • 1
  • 9
  • You either want work done in parallel or in order, you cannot have both, and it doesn't even makes sense to want both. There's also absolutely no gain in creating a Task just for spinning up parallel work – Camilo Terevinto Feb 01 '18 at 21:42
  • In this example the things ran in parallel were short and took about the same time to run, but it's not the case of why I'm asking the question. @CamiloTerevinto. About that task the parallel work in my actual case is called in a different thread, so that's why I did that (also I can't run the consumer simultaneously without doing that) – Mano Feb 01 '18 at 22:07
  • Why use a blocking collection at all? Why not just a normal array, where you put the results into the array as each task completes. When done, the array is populated. – MineR Feb 01 '18 at 23:34
  • @commenters: The point of the blocking collection is to block an Add when the collection is full. This is common in a producer/consumer pattern where it is much more work to deal with `TryAdd` or something similar. And because the call blocks, it *has* to be done in parallel; that is the whole point. – John Wu Feb 02 '18 at 00:05
  • Thanks @JohnWu for explaining. MineR, the reason is that I want to block the additions. I'm dealing with a huge amount of calls and I don't want to run out of memory, that's why storing the results in an array would be problematic for me (too much information to store before proceeding). In case of an array I thought about setting the task as null after consuming the result, but blocking collection fits the problem better. – Mano Feb 02 '18 at 09:10
  • Mano, note that @JohnWu 's solution also does not solve the issue of blocking the tasks. His solution will also enqueue up an unlimited number of tasks - the blocking collection will not stop a new task from starting. My solution as written will not work either for you, but it can be fixed by using MaxDegreeOfParallelism on the Parallel.For and an array only 1 size greater than MaxDegreeOfParallelism (with a % to put things in the right slot). JohnWu's looks like it can be sorted with just a MaxDegreeOfParallelism too. – MineR Feb 03 '18 at 02:07
  • As I worked in the solution it occurred to me that it couldn’t be a great idea to use your thread pool as a queue. Any reason you don’t just increase the capacity of the BlockingCollection? – John Wu Feb 03 '18 at 08:13
  • @MineR I was mistaken and didn't understand the problem. There isn't actually a problem of blocking but only a problem of FIFO order in consuming (FIFO in the manner of calling the producer). Iv'e updated the question accordingly and I'm still looking for help :) – Mano Feb 03 '18 at 09:17
  • @Mano, I suggest rewording the entire question to match your real problem - that way the question can be found by others. – MineR Feb 03 '18 at 11:36

2 Answers2

6

OK after the edit - instead of adding the results in the BlockingCollection, add the Tasks in the blocking collection. This has the feature where the items are processed in order AND there is a maximum parallelism which will prevent too many threads from kicking off and you eating up all your memory.

https://dotnetfiddle.net/lUbSqB

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

public class Program
{
    private static BlockingCollection<Task<int>> BlockingCollection {get;set;}  

    public static void Producer(int numTasks)
    {
        Random r = new Random(7);
        for(int i = 0 ; i < numTasks ; i++)
        {
            int closured = i;
            Task<int> task = new Task<int>(()=>
            { 
                Thread.Sleep(r.Next(100));
                Console.WriteLine("Produced: " + closured);
                return closured;
            });
            BlockingCollection.Add(task);
            task.Start();
        }
        BlockingCollection.CompleteAdding();
    }


    public static void Main()
    {
        int numTasks = 20;
        int maxParallelism = 3;

        BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);

        Task.Factory.StartNew(()=> Producer(numTasks));

        foreach(var task in BlockingCollection.GetConsumingEnumerable())
        {
            task.Wait();
            Console.WriteLine("              Consumed: "+ task.Result);
            task.Dispose();
        }

    }
}

And the results:

Produced: 0
              Consumed: 0
Produced: 1
              Consumed: 1
Produced: 3
Produced: 2
              Consumed: 2
              Consumed: 3
Produced: 4
              Consumed: 4
Produced: 6
Produced: 5
              Consumed: 5
              Consumed: 6
Produced: 7
              Consumed: 7
Produced: 8
              Consumed: 8
Produced: 10
Produced: 9
              Consumed: 9
              Consumed: 10
Produced: 12
Produced: 13
Produced: 11
              Consumed: 11
              Consumed: 12
              Consumed: 13
Produced: 15
Produced: 14
              Consumed: 14
              Consumed: 15
Produced: 17
Produced: 16
Produced: 18
              Consumed: 16
              Consumed: 17
              Consumed: 18
Produced: 19
              Consumed: 19
MineR
  • 2,144
  • 12
  • 18
  • This makes the usage of BlockingCollection's functionality irrelevant sadly. Since it forces me to create a task array as big as the number of items which is problematic because of the speed (new Task N times) and instead of blocking, it simply stores everything in the array. – Mano Feb 02 '18 at 09:14
  • Seems really really great and fits my problem, many many thanks! – Mano Feb 03 '18 at 12:10
3

I thought this was an interesting question so I spent a bit of time on it.

The scenario I understand it is this:

  1. You have a BlockingCollection that is full
  2. A number of threads start, each trying to add to the BlockingCollection. These calls will all block; that is why they need to occur in parallel.
  3. As space becomes available, the Add calls will become unblocked.
  4. The calls to Add need to complete in the order they were received.

First of all, let's talk about code structure. Instead of using a BlockingCollection and writing procedural code around it, I suggest extending the BlockingCollection and replacing the Add method with the functionality you need. It may look something like this:

public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
    private FifoMonitor monitor = new FifoMonitor();

    public QueuedBlockingCollection(int max) : base (max) {}

    public void Enqueue(T item)
    {
        using (monitor.Lock())
        {
            base.Add(item);
        }
    }
}

Here, the trick is the use of a FifoMonitor class, which will give you the functionality of a lock but will enforce order. Unfortunately, no class like that exists in the CLR. But we can write one:

public class FifoMonitor
{
    public class FifoCriticalSection : IDisposable
    {
        private readonly FifoMonitor _parent;

        public FifoCriticalSection(FifoMonitor parent)
        {
            _parent = parent;
            _parent.Enter();
        }

        public void Dispose()
        {
            _parent.Exit();
        }
    }

    private object _innerLock = new object();
    private volatile int counter = 0;
    private volatile int current = 1;

    public FifoCriticalSection Lock()
    {
        return new FifoCriticalSection(this);
    }

    private void Enter()
    {
        int mine = Interlocked.Increment(ref counter);
        Monitor.Enter(_innerLock);
        while (current != mine) Monitor.Wait(_innerLock);
    }

    private void Exit()
    {
        Interlocked.Increment(ref current);
        Monitor.PulseAll(_innerLock);
        Monitor.Exit(_innerLock);
    }
}

Now to test. Here's my program:

public class Program
{
    public static void Main()
    {
        //Setup
        var blockingCollection = new QueuedBlockingCollection<int>(10);
        var tasks = new Task[10];

        //Block the collection by filling it up
        for (int i=1; i<=10; i++) blockingCollection.Add(99);

        //Start 10 threads all trying to add another value
        for (int i=1; i<=10; i++)
        {
            int index = i; //unclose
            tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
            Task.Delay(100).Wait();  //Wait long enough for the Enqueue call to block
        }

        //Purge the collection, making room for more values
        while (blockingCollection.Count > 0)
        {
            var n = blockingCollection.Take();
            Console.WriteLine(n);
        }

        //Wait for our pending adds to complete
        Task.WaitAll(tasks);

        //Display the collection in the order read
        while (blockingCollection.Count > 0)
        {
            var n = blockingCollection.Take();
            Console.WriteLine(n);
        }

    }
}

Output:

99
99
99
99
99
99
99
99
99
99
1
2
3
4
5 
6
7
8
9
10

Looks like it works! But just to be sure, I changed Enqueue back to Add, to ensure that the solution actually does something. Sure enough, it ends up out of order with the regular Add.

99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10

Check out the code on DotNetFiddle

John Wu
  • 50,556
  • 8
  • 44
  • 80