1

I am trying to come up with a "good" design for the following situation:

  • I have a set of distinct, inherently IO bound operations. They happen to be database operations.
  • Each member in the set must be repeated indefinitely.
  • Each time a member executes, it does some internal calculation which determines how long to wait before executing it again. This is a sort of "auto-tuning" to prevent unnecessarily hammering the database with queries.
  • There may be up to a few dozen members in the set, but never approaching a large number like 100 or more
  • I want to limit the number that can run in parallel to some "reasonable" amount, say 4.
  • The precision of the next execution time can be quite low, and does not have to be strictly honoured.

I am aware that in principle this setup or running 4 at a time over a set of dozens could mean execution of the operations could "fall behind", for example when many operations want to repeat very quickly, but only 4 can ever run at the same time. However, you can take it as given that there will always be periods of relative inactivity where operations will indicate a significant delay before needing to be run again, allowing other members to "catch up".

I have a solution that "works", but given my general ignorance of async and threading, I'm sure there exist better ways. My research into this quickly exploded into a bewildering number of options using SemaphoreSlim, custom TaskScheduler implementations, and several more.

The thing that really threw me is that virtually all of the examples I could find seem to assume the desire for WaitAll style semantics, with some queue of tasks which is drained in parallel according to a max degree of parallelism until it is empty. In my case, however, operations need to be continually recycled into the "queue", and only executed after some given elapsed duration.

My question, then, is whether the code I have as my proof of concept is so bad as to be dangerous, or merely bad in terms of the pattern it uses, or just bad in terms of performance. The last of these I am least concerned about, given that any of the database operations will likely run on the order of seconds to complete, so a few milliseconds spent in the controlling code here or there doesn't really matter. But I certainly don't want to be moronically inefficient if a relatively comprehensible and more efficient solution exists.

For my own edification, I am particularly curious as to whether the pattern below is going to cause a large amount of context switching, or other overheads of that sort.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

namespace ConsoleApp2
{
    class Worker
    {
        private CancellationToken ct;
        public int id;          // just something to provide meaningful console output 
        public bool working;
        public DateTime nextExecutionTime; 
        public Worker(int id, CancellationToken ct) 
        { 
            this.ct = ct; 
            this.id = id;
            this.working = false;
            
        }
        public async Task DoWorkAsync()
        {
            int delay = 0;
            try
            {
                working = true;
                Console.WriteLine($"executing task {id}");
                delay = (int)nextExecutionTime.Subtract(DateTime.Now).TotalMilliseconds;
                delay = delay < 0 ? 0 : delay;
                // wait until it's time to "hit the database"
                await Task.Delay(delay, ct); 
                // run inherently IO based operation, eg a database query, simulated here with delay
                await Task.Delay(1000 + delay, ct);
                // simulate calculated delay until next execution - this value actually provided as output from the stored procedure doing the work
                nextExecutionTime = DateTime.Now.AddSeconds(new Random().Next() % 10);  
            }
            catch (TaskCanceledException) { }
            finally { working = false; }
        }
    }

    class Program
    {
        private async static void RunWorkers(List<Worker> runnables, CancellationToken ct)
        {
            var running = new List<Task>();
            // any random 4 to start off, it doesn't matter which
            running.AddRange(runnables.Take(4).Select(r => r.DoWorkAsync())); 
            while (!ct.IsCancellationRequested)
            {
                Task t = await Task.WhenAny(running);
                running.Remove(t);
                // this seems like a very inefficient way to handle scheduling
                Worker next = runnables.Where(r => !r.working).OrderBy(r => r.nextExecutionTime).First();
                running.Add(next.DoWorkAsync());
            }
        }

        static void Main(string[] args)
        {
            List<Worker> runnables = new List<Worker>();
            var cts = new CancellationTokenSource();
            for(int i = 0; i < 20; i++) 
            { 
                runnables.Add(new Worker(i, cts.Token)); 
            }
            Task t = Task.Run(() => RunWorkers(runnables, cts.Token));
            Console.ReadKey();
            cts.Cancel();
            t.GetAwaiter().GetResult();
        }
    }
}
allmhuran
  • 4,154
  • 1
  • 8
  • 27
  • 1
    Calling `new Random().Next()` is bad. You should always only create a single `Random` per thread used. – Enigmativity Jul 26 '20 at 10:09
  • @Enigmativity That's just a lazy simulation for demonstration of the pattern for this website without having to include all of the implementation guts. The "real" delay until next execution is provided by the output parameter of the executed stored procedure representing the actual work that needs to be done, simulated here for brevity by the combination of a `Task.Delay` and `Random.Next` I have updated my question to make that more clear. – allmhuran Jul 26 '20 at 10:18
  • What type is the output of the executed stored procedure that controls the rescheduling? Is it a `TimeSpan` (duration) or a `DateTime` (specific date and time)? – Theodor Zoulias Jul 26 '20 at 14:50
  • @TheodorZoulias It's a `smallint` in SQL terms, or `short` in c# terms, which is the "suggested" number of seconds to wait until running this particular procedure again. The set of operations to perform is a set of such procedures, sp1, sp2 ... spN, each of which must be repeated until the application is terminated, and where each may specify a new suggested "delay until calling me again" on each execution. – allmhuran Jul 26 '20 at 15:01
  • That's good, because using `DateTime`s to synchronize systems that may run in different machines, with potentially different regional settings, would be problematic. – Theodor Zoulias Jul 26 '20 at 15:06

1 Answers1

1

Your implementation is not thread-safe. The problem is not related with the variable List<Task> running because, although it is mutated by different threads, it is accessed by a single asynchronous workflow, and TPL takes care about adding the appropriate memory barriers when an asynchronous workflow switches threads. The problem is related with the fields bool working and DateTime nextExecutionTime, that could be accessed by multiple threads in parallel without synchronization. This could cause serious problems, like a Worker being scheduled more than one times concurrently. My suggestion is to move all the controlling code that reads and mutates the working and nextExecutionTime fields from the DoWorkAsync method to the central RunWorkers method, to get rid of the unwanted parallelism. This may require changing the result type of the DoWorkAsync method from Task to Task<Worker>, so that the central workflow knows which worker just completed, and mutate its fields accordingly.

Another potential problem is using DateTimes for controlling the rescheduling. The system-wise Now property is not guaranteed to always go forwards. It is possible to be adjusted automatically or manually and move backwards, causing all kind of wierd/unexpected effects to the scheduling of the Workers. To fix this problem you could consider replacing the DateTime-based scheduling with a TimeSpan-based one, using as measuring device a Stopwatch.

Regarding the inefficiency of the Task.WhenAny-in-a-loop pattern, it would become a consideration in case you had ~1000 concurrently running tasks or more. In that case the problem would be quite serious because the overhead is not correlated with the number of tasks linearly but quadratically. For less than 100 tasks my opinion is that you should not worry about it, taking into account that any alternative (like using a PrioritySemaphore) would be considerably more complex.

The same can be said for using the OrderBy operator. It would be more efficient to use a O(N) operator like the MinBy from the MoreLinq library, but any performance benefits will most probably be negligible.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks very much for your detailed answer. I thought the `Where(!r => r.working)` would "protect" the access to `nextExecutionTime` based on the atomicity of `bool`, but I had not read about the caching problem. As a curiosity, If `working` was declared `volatile`, would that then make the order by `nextExecutionTime` in the LINQ safe? Ie, given `nextExecutionTime` is only modified by `working` workers, and given the prior LINQ predicate eliminates `working` workers, then if `working` is made safe by `volatile`, would reading `nextExecutionTime` also be safe? – allmhuran Jul 26 '20 at 16:46
  • Just so it's clear, I will almost certainly simply take your advice and resort to locking to make all of this much easier, but your answer has made me think more about what's "really" going on, hence the followup question. – allmhuran Jul 26 '20 at 16:49
  • @allmhuran I updated my answer with a suggestion that removes the need for using a `lock`. – Theodor Zoulias Jul 26 '20 at 16:53
  • @allmhuran the `volatile` is a visibility-enabling mechanism, not a thread-safety-enabling mechanism. It ensures that a thread will see the changes made by another thread previously. It does not ensure that a thread will not see corrupted state, because another thread was suspended in the midst of updating the same state. For thread-safety you need synchronization mechanisms like a `lock` or a `Semaphore`. – Theodor Zoulias Jul 26 '20 at 17:00
  • So if I understand, even a volatile bool - with the cache "safety" of volatile, and the atomic "safety" of bool, still cannot really be used safely as shared state across multiple threads, and instead of trying to "cleverly" share state, it is always better to just use a lock for simplicity, or (obviously, but with more difficulty) refactor so that there is no shared state needed. Not an unexpected answer of course, but the details are interesting. – allmhuran Jul 26 '20 at 17:12
  • 1
    @allmhuran your followup question is not easy to answer. If you have time you could read [this article](https://docs.microsoft.com/en-us/archive/msdn-magazine/2012/december/csharp-the-csharp-memory-model-in-theory-and-practice) by Igor Ostrovsky. It will evaporate instantly any temptation you may have about including lock-free multi-threaded code in you application. :-) Here is the gist of the article: *"In short, locks hide all of the unpredictability and complexity weirdness of the memory model: You don’t have to worry about the reordering of memory operations if you use locks correctly."* – Theodor Zoulias Jul 26 '20 at 17:14
  • 1
    Thanks for the link and for the rest of your assistance. I will read that immediately! – allmhuran Jul 26 '20 at 17:19