1

I've come across a problem that I can readily define but for the life of me can't seem to digest the MSDN for the best possible solution. It's been a while since I had to actually think about parallel processing outside of UI responsiveness.

So say, I have a Concurrent collection of Tasks that need to be processed. For example maybe it is loading data to various consumers by type (Consumer1, Consumer2, Consumer3...Consumer[N]) the underlying Task of sending the data is the same for each task but each consumer can only accept one source at a time

Basically, I want to process as much in parallel as possible with the caveat that I can only send 1 task to each consumer at a time. So if a current Job for a Consumer is already in progress then I should move to the next item in the collection and leave it for when the Job in progress for that consumer has completed. The Concurrent collection could also be added to at any time externally and if we had new types we'd need additional threads.

I guess what my question boils down to is how do I customize the "Take" from the collection so that I only grab the next Task with a property that designates it has a Consumer that doesn't already have a Job in progress.

Any ideas on what I'm missing here or if I'm even on the right path?

Example we have a Mediator Queue with Tasks associated with Banking transactions.

So we might add to our mediator queue (Let's say send SummaryData and Send TransactionData are using the same interface contract to send data)

  1. SendTransactionData -> Bank1
  2. SendTransactionData -> Bank2
  3. SendSummaryData -> Arbiter
  4. SendTransactionData -> Bank1
  5. SendTransactionData -> Bank3
  6. SendTransactionData -> Bank1
  7. SendTransactionData -> Bank2

1,2,3,5 can be processed in parallel but due to their own system(s) each consumer can only accept one input at a time transaction 4 must wait for transaction 1 to be completed and Transaction 6 must wait for transaction 4 to process. Similarly Transaction 7 must wait for transaction 2.

Before any of the initial processes have completed someone may add another grouping.

  1. SendSummaryData -> Arbiter

  2. SendTransactionData -> Bank1

  3. SendTransactionData -> Bank4

10 can be picked up immediately if a thread is available, but 8 and 9 must be queued behind their other related tasks.

Obviously there would be better ways to design a system to accomplish this but these are essentially the specs I'm looking to satisfy.

  • Would it be possible to include in the question an actual scenario of how this mechanism could be used in practice? What kind of processing is it intended to perform? – Theodor Zoulias Feb 05 '22 at 18:08
  • I'll add this sure. I can't necessarily outline exactly my specific scenario but I will attempt to genericize it. – PurpleMonkeyDiswasher Feb 05 '22 at 18:13
  • Let me rephrase your problem. You have a stream of incoming objects that need to be processed, and all these objects can be processed with unlimited parallelism. But the objects have also a property `Bank`, and objects with the same `Bank` are not allowed to be processed in parallel, and need to be serialized. Is this description correct? – Theodor Zoulias Feb 05 '22 at 19:19
  • After processing the objects, is the processing of each object complete or follows some post-processing operation? I am asking because I would like to suggest a TPL Dataflow approach, and I don't know if I should show a custom [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) (final processing) or a custom [`TransformBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2) (intermediate processing that can be linked to some other processor downstream). – Theodor Zoulias Feb 05 '22 at 19:42
  • Actually I have already posted such an implementation [here](https://stackoverflow.com/questions/57022754/send-parallel-requests-but-only-one-per-host-with-httpclient-and-polly-to-gracef/62111743#62111743 "Send parallel requests but only one per host with HttpClient") (`CreateExclusivePerKeyTransformBlock`/`CreateExclusivePerKeyActionBlock` methods). – Theodor Zoulias Feb 05 '22 at 19:51
  • In reality the only thing that needs to be tracked is object completion so I believe that final processing at least as I currently understand the use case is not necessary unless final processing is needed to update the concurrent collection. – PurpleMonkeyDiswasher Feb 05 '22 at 19:53
  • 2
    I actually think your implementation that you noted here might be just what I'm looking for. I may have to be considerate of the number of Semaphore slims but I'll do some testing at least with a try at the initial implementation and see what happens. – PurpleMonkeyDiswasher Feb 05 '22 at 19:59
  • By final processing I mean actual work that needs to be done on the business objects. Like step-2 after step-1. The management of the buffers and the synchronization primitives should be separated from the business processing, for easier testing and maintenance. – Theodor Zoulias Feb 05 '22 at 20:11
  • Yup no actual work that needs to be done and I can't think of any potential future requirements that would require it so I think it's safe to omit final processing. – PurpleMonkeyDiswasher Feb 05 '22 at 20:16
  • In that case you have the option to simplify the [linked](https://stackoverflow.com/questions/57022754/send-parallel-requests-but-only-one-per-host-with-httpclient-and-polly-to-gracef/62111743#62111743) implementation, and replace the internal `var block = new TransformBlock` with a `var block = new ActionBlock`. It would be slightly more efficient than emitting dummy `null` values, and then throwing them away in a `DataflowBlock.NullTarget`. – Theodor Zoulias Feb 05 '22 at 20:22

2 Answers2

1

Here is an approach that is not based on the TPL Dataflow library. It is based on the Parallel.ForEachAsync API instead (available from .NET 6 onwards). The custom ForEachExclusivePerKeyAsync method below supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IAsyncEnumerable<T> as source. Its behavior in case of errors or cancellation is identical. The only difference is that concurrent operations for elements with the same key are prevented. The key of each element is obtained via a keySelector function. The processing of items with the same key is serialized.

/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static Task ForEachExclusivePerKeyAsync<TSource, TKey>(
    this IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
{
    ArgumentNullException.ThrowIfNull(keySelector);
    // The other arguments are validated by the Parallel.ForEachAsync itself.
    Dictionary<TKey, Queue<TSource>> perKey = new(keyComparer);
    return Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
    {
        TKey key = keySelector(item);
        lock (perKey)
        {
            // If there is no other task in-flight with the same key,
            // insert a null queue as an indicator of activity,
            // and start a processing loop for items with this key.
            // Otherwise enqueue this item and return.
            Queue<TSource> queue = CollectionsMarshal.GetValueRefOrAddDefault(
                perKey, key, out bool exists) ??= (exists ? new() : null);
            if (queue is not null)
            {
                queue.Enqueue(item); return;
            }
        }

        // Fire the task for this item, and for all other items with the
        // same key that might be queued while this task is in-flight.
        while (!ct.IsCancellationRequested)
        {
            await body(item, ct); // Continue on captured context
            lock (perKey)
            {
                Queue<TSource> queue = perKey[key];
                if (queue is null || queue.Count == 0)
                {
                    perKey.Remove(key); break;
                }
                item = queue.Dequeue();
            }
        }
    });
}

Usage example. A Channel<T> is used as the source/controller of the IAsyncEnumerable<T> sequence:

var channel = Channel.CreateUnbounded<Transaction>();
//...
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), options, async (x, _) =>
{
    await ProcessTransactionAsync(x);
}, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });

The ForEachExclusivePerKeyAsync implementation above uses the CollectionsMarshal.GetValueRefOrAddDefault method for updating the perKey dictionary, improving the performance at the cost of readability. For a less performant but more readable version you can look at the 4th revision of this answer.

For a version of the same method that does not depend on the Parallel.ForEachAsync API, and so it can run on .NET versions earlier than 6, you can look at the 3rd revision of this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

If your problem is designating whether or not a task has already been picked up by another consumer, you need to perform this check on task retrieval.

That is, you probably have some code somewhere that looks something like:

Task next = queue.GetNextTask();

You'll need to update your queue to know 2 things:

  1. what tasks are currently in progress, and,
  2. whether or not there are any not-in-progress tasks.

If the queue has (or can have) access to the pool of threads/processes, you can inspect the pool for #1. This is the best option, if available. #2 becomes more complicated than a typical FIFO queue, starting at the head: inspect that element, return it if it's available or move on to the next element if not.

If not, you'll need to decorate your Task objects with a flag indicating if that task is in progress or not. Flags like this (semaphores, really) are difficult and irritating to manage, but are sometimes the only option. You have to make sure you're locking appropriately and that anyone who attempts to update the flag respects the lock. You also have to deal with casualty scenarios, like what happens if a thread or process dies while processing a task.

Either way, you can then update your code to:

if (queue.HasAvailableTask()) {
  Task next = queue.GetNextAvailableTask();
  // Process task.
} else {
  // No task to process, this thread should sleep or die.
}

Similarly, if one task is dependent upon the completion of another, the queue's inspector (like HasAvailableTask()) must be able to determine somehow if a task may be processed immediately, or must wait until the completion of another task. If it must wait, skip it and move on to check the next task in the queue.

Edit: checking for other tasks of the same type

Essentially, you have to encapsulate some code (in the right place) by which the task runner can decide if the queue has any items in it that are available to run right now.

One good way to do this (there are others), is to decorate the process/thread pool with some inspectors. That is, look among the running tasks to see if there are any (or how many, if necessary) tasks that match your exclusion criteria. What those criteria are, don't matter much to this description, they're just your business logic. Add as many or as few as you need.

The queue, similarly, should have an inspector to expose if there are any (or how many, if necessary) waiting tasks.

When you check to see if there is a task to run, match up all the criteria. If there are any tasks remaining in the queue that haven't been excluded, there is work to do and another task can be scheduled.

If your criteria are complicated, you may consider returning, from the pool and from the queue separately, some sort of lightweight task descriptor that contains nothing but a key (to find the task later) and enough information to make all the decisions you need to. Consider:

public final class TaskDescriptor {
  public string Id; // Change the data type to match your task ID
  public string Type; // What type of task this is
  public string Source; // Either "runner" or "queue"
}

Fill a list from the runner and the queue, then filter. For example, you can look for Source = "runner" where Type matches a particular type, to determine whether another task of that type is currently running.

Stephan Samuel
  • 648
  • 6
  • 12
  • It is slightly more complicated than that. Not only do I need to know whether or not a Task is in progress but I also need to know if there are ANY tasks of the same type that are currently in progress. – PurpleMonkeyDiswasher Feb 05 '22 at 20:03