I am trying to come up with a "good" design for the following situation:
- I have a set of distinct, inherently IO bound operations. They happen to be database operations.
- Each member in the set must be repeated indefinitely.
- Each time a member executes, it does some internal calculation which determines how long to wait before executing it again. This is a sort of "auto-tuning" to prevent unnecessarily hammering the database with queries.
- There may be up to a few dozen members in the set, but never approaching a large number like 100 or more
- I want to limit the number that can run in parallel to some "reasonable" amount, say 4.
- The precision of the next execution time can be quite low, and does not have to be strictly honoured.
I am aware that in principle this setup or running 4 at a time over a set of dozens could mean execution of the operations could "fall behind", for example when many operations want to repeat very quickly, but only 4 can ever run at the same time. However, you can take it as given that there will always be periods of relative inactivity where operations will indicate a significant delay before needing to be run again, allowing other members to "catch up".
I have a solution that "works", but given my general ignorance of async and threading, I'm sure there exist better ways. My research into this quickly exploded into a bewildering number of options using SemaphoreSlim
, custom TaskScheduler
implementations, and several more.
The thing that really threw me is that virtually all of the examples I could find seem to assume the desire for WaitAll
style semantics, with some queue of tasks which is drained in parallel according to a max degree of parallelism until it is empty. In my case, however, operations need to be continually recycled into the "queue", and only executed after some given elapsed duration.
My question, then, is whether the code I have as my proof of concept is so bad as to be dangerous, or merely bad in terms of the pattern it uses, or just bad in terms of performance. The last of these I am least concerned about, given that any of the database operations will likely run on the order of seconds to complete, so a few milliseconds spent in the controlling code here or there doesn't really matter. But I certainly don't want to be moronically inefficient if a relatively comprehensible and more efficient solution exists.
For my own edification, I am particularly curious as to whether the pattern below is going to cause a large amount of context switching, or other overheads of that sort.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
namespace ConsoleApp2
{
class Worker
{
private CancellationToken ct;
public int id; // just something to provide meaningful console output
public bool working;
public DateTime nextExecutionTime;
public Worker(int id, CancellationToken ct)
{
this.ct = ct;
this.id = id;
this.working = false;
}
public async Task DoWorkAsync()
{
int delay = 0;
try
{
working = true;
Console.WriteLine($"executing task {id}");
delay = (int)nextExecutionTime.Subtract(DateTime.Now).TotalMilliseconds;
delay = delay < 0 ? 0 : delay;
// wait until it's time to "hit the database"
await Task.Delay(delay, ct);
// run inherently IO based operation, eg a database query, simulated here with delay
await Task.Delay(1000 + delay, ct);
// simulate calculated delay until next execution - this value actually provided as output from the stored procedure doing the work
nextExecutionTime = DateTime.Now.AddSeconds(new Random().Next() % 10);
}
catch (TaskCanceledException) { }
finally { working = false; }
}
}
class Program
{
private async static void RunWorkers(List<Worker> runnables, CancellationToken ct)
{
var running = new List<Task>();
// any random 4 to start off, it doesn't matter which
running.AddRange(runnables.Take(4).Select(r => r.DoWorkAsync()));
while (!ct.IsCancellationRequested)
{
Task t = await Task.WhenAny(running);
running.Remove(t);
// this seems like a very inefficient way to handle scheduling
Worker next = runnables.Where(r => !r.working).OrderBy(r => r.nextExecutionTime).First();
running.Add(next.DoWorkAsync());
}
}
static void Main(string[] args)
{
List<Worker> runnables = new List<Worker>();
var cts = new CancellationTokenSource();
for(int i = 0; i < 20; i++)
{
runnables.Add(new Worker(i, cts.Token));
}
Task t = Task.Run(() => RunWorkers(runnables, cts.Token));
Console.ReadKey();
cts.Cancel();
t.GetAwaiter().GetResult();
}
}
}