As I mentioned in my comment above, you can create your own wrapper around a queue that manages background processors of your queue and re-queues the tasks as they complete.
In addition, you mentioned the need to dynamically add or remove tasks at will, which the below implementation will handle.
And finally, it takes an external CancellationToken
so that you can either call stop on the processor itself, or cancel the parent CancellationTokenSource
.
public class QueueProcessor
{
// could be replaced with a ref-count solution to ensure
// all duplicated tasks are removed
private readonly HashSet<string> _tasksToRemove = new();
private readonly ConcurrentQueue<string> _taskQueue;
private Task[] _processors;
private Func<string, CancellationToken, Task> _processorCallback;
private CancellationTokenSource _cts;
public QueueProcessor(
string[] tasks,
Func<string, CancellationToken, Task> processorCallback)
{
_taskQueue = new(tasks);
_processorCallback = processorCallback;
}
public async Task StartAsync(int numberOfProcessorThreads,
CancellationToken cancellationToken = default)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_processors = new Task[numberOfProcessorThreads];
for (int i = 0; i < _processors.Length; i++)
{
_processors[i] = Task.Run(async () => await ProcessQueueAsync());
}
await Task.WhenAll(_processors);
}
public void Stop()
{
_cts.Cancel();
_cts.Dispose();
}
public void RemoveTask(string task)
{
lock (_tasksToRemove)
{
_tasksToRemove.Add(task);
}
}
public void AddTask(string task) => _taskQueue.Enqueue(task);
private async Task ProcessQueueAsync()
{
while (!_cts.IsCancellationRequested)
{
if (_taskQueue.TryDequeue(out var task))
{
if (ShouldTaskBeRemoved(task))
{
continue;
}
await _processorCallback(task, _cts.Token);
if (!ShouldTaskBeRemoved(task))
{
_taskQueue.Enqueue(task);
}
}
else
{
// sleep for a bit before checking for more work
await Task.Delay(1000, _cts.Token);
}
}
}
private bool ShouldTaskBeRemoved(string task)
{
lock (_tasksToRemove)
{
if (_tasksToRemove.Contains(task))
{
Console.WriteLine($"Task {task} requested for removal");
_tasksToRemove.Remove(task);
return true;
}
}
return false;
}
}
You can test the above with the following:
public async Task MyFunction(string command, CancellationToken cancellationToken)
{
await Task.Delay(50);
if (!cancellationToken.IsCancellationRequested)
{
Console.WriteLine($"Execute command: {command}");
}
else
{
Console.WriteLine($"Terminating command: {command}");
}
}
var cts = new CancellationTokenSource();
var processor = new QueueProcessor(
new string[] { "Task1", "Task2", "Task3" },
MyFunction);
var task = processor.StartAsync(2, cts.Token);
await Task.Delay(100);
processor.RemoveTask("Task1");
await Task.Delay(500);
cts.Cancel();
await runningProcessorTask;
This results in the following output:
Execute command: Task2
Execute command: Task1
Execute command: Task3
Execute command: Task2
Task Task1 requested for removal
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task2
Execute command: Task3
Terminating command: Task2
Terminating command: Task3
If you would prefer to use a Channel<T>
backed version that handles waiting for additional work gracefully without a manual Task.Delay
, the following version exposes the same public api without the internal ConcurrentQueue<T>
.
public class QueueProcessor
{
// could be replaced with a ref-count solution to ensure all duplicated tasks are removed
private readonly HashSet<string> _tasksToRemove = new();
private readonly System.Threading.Channels.Channel<string> _taskQueue;
private Task[] _processors;
private Func<string, CancellationToken, Task> _processorCallback;
private CancellationTokenSource _cts;
public QueueProcessor(string[] tasks, Func<string, CancellationToken, Task> processorCallback)
{
_taskQueue = Channel.CreateUnbounded<string>();
_processorCallback = processorCallback;
for (int i = 0; i < tasks.Length; i++)
{
_taskQueue.Writer.WriteAsync(tasks[i]);
}
}
public async Task StartAsync(int numberOfProcessorThreads, CancellationToken cancellationToken = default)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_processors = new Task[numberOfProcessorThreads];
for (int i = 0; i < _processors.Length; i++)
{
_processors[i] = Task.Run(async () => await ProcessQueueAsync());
}
await Task.WhenAll(_processors);
}
public void Stop()
{
_taskQueue.Writer.TryComplete();
_cts.Cancel();
_cts.Dispose();
}
public void RemoveTask(string task)
{
lock (_tasksToRemove)
{
_tasksToRemove.Add(task);
}
}
public ValueTask AddTask(string task) => _taskQueue.Writer.WriteAsync(task);
private async Task ProcessQueueAsync()
{
while (!_cts.IsCancellationRequested && await _taskQueue.Reader.WaitToReadAsync(_cts.Token))
{
if (_taskQueue.Reader.TryRead(out var task))
{
if (ShouldTaskBeRemoved(task))
{
continue;
}
await _processorCallback(task, _cts.Token);
if (!ShouldTaskBeRemoved(task))
{
await _taskQueue.Writer.WriteAsync(task);
}
}
}
}
private bool ShouldTaskBeRemoved(string task)
{
lock (_tasksToRemove)
{
if (_tasksToRemove.Contains(task))
{
Console.WriteLine($"Task {task} requested for removal");
_tasksToRemove.Remove(task);
return true;
}
}
return false;
}
}