2

I'm trying to run Parallel.ForEach on my Priority Queue but I am getting the following error:

Severity Code Description Project File Line Suppression State Error CS0411 The type arguments for method 'Parallel.ForEach(OrderablePartitioner, ParallelOptions, Action<TSource, ParallelLoopState, long>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. TPL_POC.PL

I know how to execute Parallel.ForEach with IEnumerable and Lists but there's no luck with the following.

private void ProcessTasksParallely()
{
    PriorityQueue<string, int> activeTasksPriority = new PriorityQueue<string, int>();
    foreach (var task in this.tasks)
    {
        activeTasksPriority.Enqueue(task.Task, task.Id);
    }
    Console.WriteLine("Processing");

    var options = new ParallelOptions { MaxDegreeOfParallelism = (Environment.ProcessorCount / 2) * 10 };

    Parallel.ForEach(activeTasksPriority.TryDequeue(out string t, out int priority),
        options,
        (t, priority) =>
        {
            Console.WriteLine($" task {priority}, task = {t}, thread = {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
}

I am trying this because I need to process tasks parallel but according to the priority they were scheduled.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • What are you trying to do? `Parallel.ForEach` isn't built for pub/sub scenarios, it's built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A `PriorityQueue` isn't something used in data parallelism problems - if you want the results to be ordered, you'd use PLINQ and an `OrderBy` clause. – Panagiotis Kanavos Mar 15 '22 at 09:18
  • In networking and messaging, priority processing is performed through *multiple queues* not a single priority queue. Priorities inevitably change the perceived order of items and the queue state, which is a big no-no for concurrency. You could use multiple Channel instances as asynchronous queues and use a different number of workers to process each queue eg using `Parallel.ForEachAsync` with a different MaxDOP per queue. Or you could retrieve items in order, processing all items in the high-priority queue before reading from the next – Panagiotis Kanavos Mar 15 '22 at 09:25
  • Finally, you can't just use an `IEnumerable<>` with `Parallel.ForEach`, because the default partitioner *buffers* items. If you have 8 cores a new high-priority item may have to wait behind 8 or more low-priority items that are already buffered for processing. You'd have to explicitly disable buffering – Panagiotis Kanavos Mar 15 '22 at 10:21
  • @PanagiotisKanavos, I understand what you're trying to say and I fully agree. But, what I am trying to do is, suppose I have multiple tasks scheduled and each task has a priority assigned to it, let's say Task A with priority 1, Task B with 2, Task C with 3, and Task D with 4 and my DOP is 3. Now, what I want to do is Pick Task A, B, and C and execute them parally and then pick task D if one of the task finishes early. – Nihal Singh Mar 15 '22 at 10:22
  • `Parallel.ForEach` just isn't built for that scenario. Its partitioning, buffering behavior would cause problems. You'd have to use some advanced options to disable it. Multiple queues would offer predictable behavior. `Parallel.ForEahAsync` would be another option, since it doesn't use partitioning or buffering *and* allows the use of asynchronous methods – Panagiotis Kanavos Mar 15 '22 at 10:32
  • What are the *actual* semantics? How heavy is the processing, how frequent the items? Should low-priority items be processed *only* if there are no high-priority items? This has a sync overhead. Or is it OK to give more resources to high-priority items? That's how high-frequency messaging systems work. – Panagiotis Kanavos Mar 15 '22 at 10:39
  • @PanagiotisKanavos I don't understand what you mean by messaging systems. My concern is related to services that need to be run parallelly according to their priorities. – Nihal Singh Mar 15 '22 at 10:44
  • That's an example. High-frequency systems prefer independent queues over a single one. If you have few items and heavy processing per item, that's not your case. OTOH scheduling libraries also use multiple queues if they need to scale out – Panagiotis Kanavos Mar 15 '22 at 10:48
  • If you want to schedule jobs, why not use HangFire or Coravel? They've already solved this problem – Panagiotis Kanavos Mar 15 '22 at 10:51

2 Answers2

3

The PriorityQueue<TElement, TPriority> class does not offer a way to consume it as an IEnumerable out of the box. It only has an UnorderedItems property, which is not what you want. This property yields the contents of the queue without consuming them, and in no particular order. It is easy though to implement a custom GetConsumingEnumerable method for the PriorityQueue<TElement, TPriority> class, like this:

/// <summary>
/// Gets an enumerable sequence that consumes the elements of the queue
/// in an ordered manner.
/// </summary>
public static IEnumerable<(TElement Element, TPriority Priority)>
    GetConsumingEnumerable<TElement, TPriority>(
    this PriorityQueue<TElement, TPriority> source)
{
    while (source.TryDequeue(out TElement element, out TPriority priority))
    {
        yield return (element, priority);
    }
}

Usage example:

var partitioner = Partitioner.Create(activeTasksPriority.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(partitioner, options, entry =>
{
    var (t, priority) = entry;
    Console.WriteLine($"Priority: {priority}, Task: {t}");
    Thread.Sleep(100);
});

The intention of the Partitioner.Create+NoBuffering is to prevent the Parallel.ForEach from consuming elements in advance and storing them into a buffer, before it's ready to process them.

Note: This answer deals with the simple scenario presented in the question, where the PriorityQueue<E,P> is fully populated before starting the parallel loop. In case you want to add more items in the queue while the loop is running, you can't use directly a PriorityQueue<E,P> for two reasons:

  1. It's not a thread-safe collection.
  2. It doesn't have blocking capabilities, so the loop might complete prematurely before all items are processed.

If you are dealing with such a scenario, you could take a look at this question: Concurrent collection with priority.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This can easily invert priorities for a couple of reasons. First, all worker tasks may be busy with low priority items when a new high priority item gets posted. Second, the default partitioner used by `Parallel.ForEach` uses buffering, which means a high priority item may have to wait for *multiple* low-priority items to complete before it can get processed – Panagiotis Kanavos Mar 15 '22 at 10:19
  • Thanks for the answer but @PanagiotisKanavos is right. The priorities got inverted and it behaved essentially like an Enumerable without any concern of the priority. – Nihal Singh Mar 15 '22 at 10:26
  • @PanagiotisKanavos the code posted by the OP shows a priority queue that is fully populated before starting the `Parallel.ForEach` loop. My answer covers this scenario only. Enqueueing more elements in the queue while the parallel loop is running, is not a supported scenario for the `GetConsumingEnumerable` extension method. The `PriorityQueue` is not a thread-safe collection. – Theodor Zoulias Mar 15 '22 at 10:29
  • As the OP explained, the scenario wasn't covered and the priorities got inverted. If the items were know in advance there would be no need for a PriorityQueue – Panagiotis Kanavos Mar 15 '22 at 10:33
  • @NihalSingh be aware that the `Parallel.ForEach` method invokes the action multiple times simultaneously. So, with `MaxDegreeOfParallelism = 10` for example, the first 10 elements of the queue will be dequeued immediately and will be processed in parallel. Regarding these 10 elements, the concept of order does not apply between them. It is entirely possible that the processing of the element #10 may start before the element #1. It is even theoretically possible that the last element in the queue will be completed before starting the first element. That's the nature of parallelism. – Theodor Zoulias Mar 15 '22 at 11:06
0

If you want to implement priority in a pub/sub scenario, both Parallel.ForEach and PriorityQueue<T> are bad choices.

  • Parallel.ForEach is built for data parallelism - processing a ton of in-memory data by partitioning it and using roughly one worker task per core to process each partition with minimal synchronization. A PriorityQueue isn't needed here - if you want a specific order you can impose it using eg PLINQ and OrderBy.
  • Priorities inevitably change the perceived order of items and the queue state, which is a big no-no for concurrency.
  • Priorities can get inverted. All worker tasks may be busy processing low-priority items while a new high priority item is waiting. Worse, the default partitioner used by Parallel.ForEach buffers items. This means that a new high priority item may have to wait for multiple low priority items. You'd have to use Partitioner.Create with an option to disable buffering

In high-throughput networking and messaging, priority processing is performed through multiple queues not a single priority queue. Higher-priority queues get more resources or are processed before lower priority queues.

One queue per priority class

This is how highly-scaleable messaging systems work, because it doesn't require any synchronization to determine which item to process next.

One way to implement this strategy would be to use multiple ActionBlock instances, each with a different number of worker tasks :

async Task ProcessMessage(string msg) {...}

ExecutionDataflowBlockOptions WithDop(int dop)=>new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism = dop
};


void BuildQueues()
{ 

   _highQueue=new ActionBlock<string>(ProcessMessage,WithDop(4));

   _midQueue=new ActionBlock<string>(ProcessMessage,WithDop(2));

   _lowQueue=new ActionBlock<string>(ProcessMessage,WithDop(1));
}

public void Process(string msg,int priority)
{
    var queue= priority switch {
          0 => _highQueue,
          1 => _midQueue,
          _ => _lowQueue
    }
    queue.Post(msg);    
}

async Task Complete()
{
    _highQueue.Complete();
    _midQueue.Complete();
    _lowQueue.Complete();
    await Task.WhenAll(
        _hiqhQueue.Completion, 
        _midQueue.Completion, 
        _lowQueue.Completion
    );
}

In this case Process uses pattern matching to route the message to the appropriate ActionBlock

Guru Stron
  • 102,774
  • 10
  • 95
  • 132
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236