4

Consider the following setup:

public class WorkItem
{
    public string Name { get; set; }
}

public class Worker
{
    public async Task DoWork(WorkItem workItem)
    {
        await Task.Delay(1000); //run task
    }
}


public class Engine
{
    private ConcurrentQueue<WorkItem> _workItems = new();
    private List<Worker> _workers = new();

    public Engine(int workers, int threads)
    {
        ConcurrentThreadsForEachWorker = threads;
        _workers = new();
        for (int i = 0; i < workers; i++)
        {
            _workers.Add(new());
        }
    }

    public int ConcurrentThreadsForEachWorker { get; private set; }    

    public async Task RunAsync(CancellationToken token)
    {
        while(!token.IsCancellationRequested)
        {
            //distribute work amongst workers here
        }
    }
}

Let's say on the constructor for the Engine class, I get 2 for workers and 4 for threads. I want to implement the RunAsync method so that the workers have equal load as follows:

WorkItem 1 -> Worker 1 (running 1)
WorkItem 2 -> Worker 2 (running 1)
WorkItem 3 -> Worker 1 (running 2)
WorkItem 4 -> Worker 2 (running 2)
WorkItem 5 -> Worker 1 (running 3)
WorkItem 6 -> Worker 2 (running 3)
WorkItem 7 -> Worker 1 (running 4 - full)
WorkItem 8 -> Worker 2 (running 4 - full)
WorkItem 9 -> Both workers are full, so wait until one of them is free

More Info

Maybe a bit of context would help. I'm trying to process a lot of inboxes (around 1000). The "Worker" is a Microsoft Graph Client and the "WorkItem" is an inbox. Each Graph Client can run 4 queries at a given time. So I want to register a couple of App IDs in the Graph Client and let each of them handle 4 of these inboxes. I basically want to process them as fast as possible.

Microsoft Graph has a subscription feature that when there's a new email, it can notify the program. So when that notification arrives, I want to add it to the queue and if the queue has enough room, it should immediately process it.

I'd also love it if there was a way that we could add an item to the front of the queue (instead of the back). So if there's an urgent change or a high-priority inbox, the clients could process that inbox ASAP.

Alireza Noori
  • 14,961
  • 30
  • 95
  • 179
  • Do not mix tasks and threads. That's heading towards disaster. If you want fine grained control over concurrency, consider [TPL DataFlow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library). There you could setup a pipeline that bifurcates in two identical blocks with a certain parallelity and joins results again (if necessary). – Fildor Dec 23 '22 at 18:14
  • I don't want to run them in threads necessarily. I want each worker to have a maximum of n work items, running in parallel. – Alireza Noori Dec 23 '22 at 18:20
  • That's a textbook usecase for DataFlow, then. – Fildor Dec 23 '22 at 18:21
  • @Fildor could you please post a small snippet as an answer please? – Alireza Noori Dec 23 '22 at 18:22
  • @Fildor it might be worth mentioning that this queue is dynamic. Meaning, the items could be added to it during the program runtime. – Alireza Noori Dec 23 '22 at 20:13
  • 2
    I would suggest to rename the property `ConcurrentThreadsForEachWorker` to something like `MaxConcurrencyPerWorker` or `MaxDegreeOfParallelismPerWorker`, because people freak out when the words *"thread"* and *"task"* are placed in close proximity. :-) – Theodor Zoulias Dec 23 '22 at 23:37
  • Then it is even more a case for DataFlow. – Fildor Dec 24 '22 at 06:21
  • I can try and make a little example but it will take time. Mind the current date... – Fildor Dec 24 '22 at 06:27
  • Regarding the need to add items in front of the queue, you might find this API proposal interesting: [Possible PriorityChannel](https://github.com/dotnet/runtime/issues/62761). In case you want a `PriorityChannel` right now, ask a new question here, and we might come up with a home-made solution. – Theodor Zoulias Dec 24 '22 at 18:52
  • An easy way to prioritize urgent items could be to use two input channels, and [merge](https://stackoverflow.com/questions/70658393/merge-multiple-iasyncenumerable-streams/70659874#70659874) them. The System.Interactive.Async `Merge` operator is [biased](https://stackoverflow.com/questions/70710153/how-to-merge-multiple-asynchronous-sequences-without-left-side-bias) towards the left-side sequences, so you could do this: `var merged = AsyncEnumerableEx.Merge(urgentChannel.Reader.ReadAllAsync(), normalChannel.Reader.ReadAllAsync())`, and then pass the `merged` to the `Parallel.ForEachAsync`. – Theodor Zoulias Dec 25 '22 at 16:07

2 Answers2

2

A simple solution might be to store the workers in a Queue<T> instead of a List<T>, dequeue a worker every time you need one, and enqueue it back immediately:

Queue<Worker> _workers = new();
for (int i = 0; i < workersCount; i++) _workers.Enqueue(new());

ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker;
    lock (_workers)
    {
        worker = _workers.Dequeue();
        _workers.Enqueue(worker);
    }
    await worker.DoWork(workItem);
});

This way the workers will be used in a round robin fashion, as an unlimited resource. The MaxConcurrencyPerWorker policy will not be enforced.

If you want to enforce this policy, then you must use them as a limited resource, so enqueue them back in the queue only after the completion of the DoWork operation. You must also enqueue each Worker multiple times in the queue (MaxConcurrencyPerWorker times), in an interleaving manner. You must also deal with the case that the pool of workers has been exhausted, in which case the execution flow will have to be suspended until a worker becomes available. A Queue<T> doesn't offer this functionality. You will need a Channel<T>:

Channel<Worker> workerPool = Channel.CreateUnbounded<Worker>();
for (int i = 0; i < MaxConcurrencyPerWorker; i++)
    foreach (Worker worker in _workers)
        workerPool.Writer.TryWrite(worker);

ParallelOptions options = new() { MaxDegreeOfParallelism = workerPool.Reader.Count };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker = await workerPool.Reader.ReadAsync();
    try
    {
        await worker.DoWork(workItem);
    }
    finally { workerPool.Writer.TryWrite(worker); }
});

The Channel<T> is an asynchronous version of the BlockingCollection<T>. The ChannelReader.ReadAsync method returns a worker synchronously if one is stored currently in the channel, or asynchronously if the channel is currently empty. In the above example the ReadAsync will always return synchronously a worker, because the degree of parallelism of the Parallel.ForEachAsync loop has been limited to the number of the total (not distinct) available workers.


Update: The above solution does not guarantee perfect balancing in the long-run. It's not impossible for the workerPool to lose gradually its "interleaving" property, resulting in many references of the same Worker being stored consecutively the one after the other. For precise control it might be required to keep track of the usage statistics of each individual worker. You would need some structure resembling a LRU cache to hold the workers and the statistics. Something like an ObjectPool<T> with priority management. Here is what I came up with. A PriorityPool<T> class that is backed up by a simple array (instead of something more complex like a dictionary or a sorted set or a priority queue), which is also equipped with a SemaphoreSlim in order to enforce the MaxConcurrencyPerWorker policy.

public class PriorityPool<T> : IDisposable
{
    private struct Entry
    {
        public T Item;
        public int ConcurrencyCount;
        public long LastUseStamp;
    }

    private readonly Entry[] _items;
    private readonly IEqualityComparer<T> _comparer;
    private readonly SemaphoreSlim _semaphore;
    private long _lastUseStamp;

    public int Count { get { return _items.Length; } }

    public PriorityPool(IEnumerable<T> items, int maxConcurrencyPerItem,
        IEqualityComparer<T> comparer = default)
    {
        ArgumentNullException.ThrowIfNull(items);
        if (maxConcurrencyPerItem < 1)
            throw new ArgumentOutOfRangeException(nameof(maxConcurrencyPerItem));
        _items = items.Select(x => new Entry() { Item = x }).ToArray();
        _comparer = comparer ?? EqualityComparer<T>.Default;
        if (_items.Length == 0)
            throw new ArgumentException("No items found.", nameof(items));
        if (_items.DistinctBy(e => e.Item, _comparer).Count() != _items.Length)
            throw new ArgumentException("Duplicate item found.", nameof(items));
        int semaphoreSize = _items.Length * maxConcurrencyPerItem;
        _semaphore = new(semaphoreSize, semaphoreSize);
    }

    public async ValueTask<T> GetAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        lock (_items)
        {
            int index = 0;
            for (int i = 1; i < _items.Length; i++)
            {
                int diff = _items[i].ConcurrencyCount - _items[index].ConcurrencyCount;
                if (diff > 0) continue;
                if (diff < 0 || _items[i].LastUseStamp < _items[index].LastUseStamp)
                    index = i;
            }
            _items[index].ConcurrencyCount++;
            _items[index].LastUseStamp = ++_lastUseStamp;
            return _items[index].Item;
        }
    }

    public void Return(T item)
    {
        lock (_items)
        {
            int index;
            for (index = 0; index < _items.Length; index++)
                if (_comparer.Equals(item, _items[index].Item)) break;
            if (index == _items.Length)
                throw new InvalidOperationException("Item not found.");
            if (_items[index].ConcurrencyCount == 0)
                throw new InvalidOperationException("Negative concurrency.");
            _items[index].ConcurrencyCount--;
        }
        _semaphore.Release();
    }

    public void Dispose() => _semaphore.Dispose();
}

Usage example:

using PriorityPool<Worker> workerPool = new(_workers, MaxConcurrencyPerWorker);

//...

Worker worker = await workerPool.GetAsync();
try
{
    await worker.DoWork(workItem);
}
finally { workerPool.Return(worker); }

The GetAsync method returns the worker with the least concurrency level at the moment. In case of a tie, it returns the least recently used worker.

The PriorityPool<T> class is thread-safe, with the exception of Dispose, which must be used only when all other operations on the PriorityPool<T> have completed (behavior inherited from the SemaphoreSlim).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Yes, it was a wrong choice of name. Thank you very much for a great answer. I'll give this a try but I'm pretty sure it's what I want. – Alireza Noori Dec 24 '22 at 00:35
  • I gave this a try but there's a problem I'm facing. I need to be able to enqueue items as they are processing. The `Parallel.ForEachAsync` throws an exception because the collection is being modified. How can we fix that? – Alireza Noori Dec 24 '22 at 16:24
  • @AlirezaNoori you could consider using a `Channel` as the source of the `Parallel.ForEachAsync` method. To enqueue an item: `channel.Writer.TryWrite(item)`. To consume and process the items: `await Parallel.ForEachAsync(channel.Reader.ReadAllAsync(), ...`. To create the channel: `var channel = Channel.CreateUnbounded();`. The `Channel` is thread-safe, and supports multiple concurrent writers and readers. – Theodor Zoulias Dec 24 '22 at 16:46
  • Sorry, my bad. The queue was a normal one rather than a `ConcurrentQueue`. It was left from one of my changesets. But now the problem is, let's say we have 2 items in the queue. It will process those items but can't process the new incoming work items unless these two are finished. As to your suggestion, actually, my first try was to use a Channel, but the problem is that I want to process multiple items in parallel. How can we do that? I know how to read a single item from the channel, but how to have something like `Parallel.ForEachAsync` on a `Channel`? – Alireza Noori Dec 24 '22 at 16:52
  • I added more info. Please read the updated question. – Alireza Noori Dec 24 '22 at 17:09
  • @AlirezaNoori you can't use a normal `Queue` as the conveyer belt in a producer-consumer scenario. You need a thread-safe queue with either blocking or async capabilities, and the most readily available tool for this purpose is the `Channel`. A `BufferBlock` from the TPL Dataflow can also do the job, and there are third-party tools as well (I have the AsyncEx library in mind), but the `Channel` is the best. It doesn't support reordering its internal buffer though, and I don't know of any tool that offers this feature. – Theodor Zoulias Dec 24 '22 at 17:42
  • Is it possible you could give an example for the `Channel` version? I can't figure out how to process multiple jobs at once. – Alireza Noori Dec 24 '22 at 18:18
  • @AlirezaNoori check out [this question](https://stackoverflow.com/questions/73128372/how-to-run-a-parallel-foreachasync-loop-with-nobuffering "How to run a Parallel.ForEachAsync loop with NoBuffering?"). There is an example at the bottom of the question (the `Processor` class). – Theodor Zoulias Dec 24 '22 at 18:29
  • @AlirezaNoori I updated the answer with a more sophisticated, but also more complex solution. – Theodor Zoulias Dec 24 '22 at 18:45
  • Thanks. Based on the post you sent, I updated my implementation and it's now working flawlessly. I'm sure your implementation is great too so I'll check that one too but essentially, I changed from a `ConcurrentQueue` for the `WorkItems` to a `Channel` and used a "Fake" infinite source for the `Parallel.ForEachAsync` and inside the body, I just read the next item from the Channel and execute it. Pretty straightforward. – Alireza Noori Dec 24 '22 at 19:22
  • 2
    @AlirezaNoori yep, there are lots of tricks you can do with these tools. To be honest though this whole mechanism (channel+`ForEachAsync`) is essentially an [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) from the TPL Dataflow library. Currently I am not very enthusiastic about this library because I discovered quite a lot of rough corners (including reported bugs that still remain unsolved for years), but I know of no issue with using a single `ActionBlock` that is not linked to other blocks. – Theodor Zoulias Dec 24 '22 at 19:32
1

As promised:

It got a little more complicated, than I thought, especially after the requirement of "VIP" Messages.

Disclaimer:

  1. This is not Production-Ready. It is just a rough scribble.
  2. I do not say this is in any shape or form "better" than the accepted answer. It is just an alternative which may have some quirks.

Code:

using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Example Driver
public class Program
{
    private static Random rnd = new Random();
    
    public static async Task Main()
    {
        IEngine engine = new Engine();
        
        // Simulate some traffic
        for( int i = 0; i < 100; i++ )
        {
            await Simulate(rnd, engine);
        }
        
        // Wait until all messages have passed the process.
        await engine.Shutdown();
    }
    
    // "Simulator"
    public static int _workItems = 0;
    public static async Task Simulate(Random rnd, IEngine engine)
    {
        // Create WorkItem
        var workItem = new WorkItem( $"WorkItem {_workItems++:000}", TimeSpan.FromMilliseconds(rnd.Next(750,999)));
        var isPriority = rnd.Next(100) < 10; // 10% Probablity for VIP Message
        await Task.Delay(TimeSpan.FromMilliseconds(rnd.Next(0,50))); // Simulate incoming jitter
        await (isPriority 
            ? engine.ProcessVIPAsync(workItem)
            : engine.ProcessAsync(workItem));
    }
}

// WorkItem-Related: Name and Effort (to simpulate actual work done)
public record WorkItem (string Name, TimeSpan Effort);

// Interfaces for simulated Client and Engine
public interface IClient
{
    Task ProcessAsync(WorkItem message);
}

public interface IEngine
{
    Task ProcessAsync(WorkItem message);
    Task ProcessVIPAsync(WorkItem message);
    Task Shutdown(); // Graceful shutdown
}

public interface IEngineBlock
{
    Task ProcessAsync(WorkItem message);
    Task Shutdown(); // Graceful shutdown
}


// Implementations
public class Engine : IEngine
{
    // 2 Clients with 4 Slots each for "normal" processing
    private readonly IEngineBlock _normalEngine = new EngineBlock(2);
    // 1 Client with 4 Slots for VIP processing, assuming a probablity of ~10%
    private readonly IEngineBlock _prioEngine   = new EngineBlock(1);

    public Task ProcessAsync(WorkItem message)
    {
        return _normalEngine.ProcessAsync(message);
    }
    
    public Task ProcessVIPAsync(WorkItem message)
    {
        return _prioEngine.ProcessAsync(message);
    }
    
    public Task Shutdown()
    {
        return Task.WhenAll(_normalEngine.Shutdown(), _prioEngine.Shutdown());
    }
    
    private class EngineBlock : IEngineBlock
    {
        private readonly List<ClientWrapper> _clients = new();
        private readonly Func<int> _iterator;
        
        public EngineBlock(int clientCount)
        {
            for( int i = 0; i < clientCount; i++ ) _clients.Add(new ClientWrapper());
            _iterator = clientCount > 1 ? NextIndex : () => 0;
        }
        
        public Task ProcessAsync(WorkItem message)
        {
            return NextClient().ProcessAsync(message);
        }
        
        public Task Shutdown()
        {
            return Task.WhenAll(_clients.Select(x => x.Shutdown()).ToList());
        }

        private ClientWrapper NextClient()
        {
            return _clients[_iterator()];
        }
        
        private int _currentIndex = 0;
        private int NextIndex()
        {
            return Interlocked.Exchange(ref _currentIndex, (_currentIndex+1)%_clients.Count );
        }

        private class ClientWrapper
        {
            // TPL DataFlow =>
            private readonly ActionBlock<WorkItem> _actionBlock;
            private readonly IClient _client;
            
            public ClientWrapper()
            {
                 _client = new SimulatedClient();
                 _actionBlock = new ActionBlock<WorkItem>(ProcessAtClientAsync, new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 4 });
                 // If you want a push-back on the client when
                 // slots are occupied, you can configure the
                 // input buffer with
                 // BoundedCapacity = 4
            }
            
            private Task ProcessAtClientAsync( WorkItem message ) => _client.ProcessAsync(message);
            
            public Task ProcessAsync(WorkItem message) => _actionBlock.SendAsync(message);
            
            public Task Shutdown()
            {
                _actionBlock.Complete();
                return _actionBlock.Completion;
            }
        }
    }
    
    public class SimulatedClient : IClient
    {
        private static int count = 0;
        public string Name { get; } = $"Client {++count}";
    
        private int _slots = 4;
    
        public async Task ProcessAsync(WorkItem message)
        {
            var slots = Interlocked.Decrement(ref _slots);
            await Task.Delay(message.Effort);
            Console.WriteLine($"{Name} Processed {message.Name} | Slots free: {slots}");
            Interlocked.Increment(ref _slots);
        }
    }
}

In action: https://dotnetfiddle.net/AVHF0i

I ran this in a local project with 100 items casually sent, then 500 burst and again 200 casually with ~10% probablity for VIP.

Where ...

"casually" means: Offset between sends between 500 and 1999 ms with 50:50 chance of 0ms

"burst" means: Offset between sends between 25 and 50 ms with 50:50 chance of 0ms

"Effort" does not count into "runtime" and was between 750 and 1250 ms.

That resulted in:

╭─────────────────┬────────────────────────╮
│Items processed  │                  800   │
│Priority Items   │                   92   │
│Threads used     │                    6   │
│Clients (N/P)    │         2 / 1          │
├─────────────────┼────────────────────────┤
│  Client e9c4    │                  354   │
│  Client 5f2d    │                   92   │
│  Client da2c    │                  354   │
├─────────────────┼────────────────────────┤
│Avg Norm Runtime │   29,802119538700556 s │
│Avg Prio Runtime │    3,631634751086957 s │
│Avg Norm Wait    │   29,794613347316393 s │
│Avg Prio Wait    │   3,6240866793478275 s │
╰─────────────────┴────────────────────────╯
Fildor
  • 14,510
  • 4
  • 35
  • 67