0

I have a pipeline that consists of several stages. Jobs in the same stage can be worked on in parallel. But all jobs in stage 1 have to completed before anyone can start working on jobs in stage 2, etc..

I was thinking of synchronizing this work using a CountDownEvent.

My basis structure would be

this.WorkerCountdownEvent = new CountdownEvent(MaxJobsInStage);
this.WorkerCountdownEvent.Signal(MaxJobsInStage); // Starts all threads
// Each thread runs the following code

for (this.currentStage = 0; this.currentStage < this.PipelineStages.Count; this.currentStage++)
{
    this.WorkerCountdownEvent.Wait();
    var stage = this.PipelineStages[this.currentStage];
    if (stage.Systems.Count < threadIndex)
    {
        var system = stage.Systems[threadIndex];
        system.Process();
    }

    this.WorkerCountdownEvent.Signal(); // <--

}

This would work well for processing one stage. But the first thread that reaches this.WorkerCountdownEvent.Signal() will crash the application as its trying to decrement the signal to below zero.

Of course if I want prevent this, and have the jobs to wait again, I have to call this.WorkerCountdownEvent.Reset(). But I have to call it after all threads have started working, but before one thread is done with its work. Which seems like an impossible task?

Am I using the wrong synchronization primitive? Or should I use two countdown events? Or am I missing something completely?

(Btw usually jobs will take less than a milliseconds so bonus points if someone has a better way to do this using 'slim' primitives like ManualResetEventSlim. ThreadPools, or Task<> are not the direction I'm looking at since these threads will live for very long (hours) and need to go through the pipeline 60x per second. So the overhead of stopping/starting a Tasks is considerable here).

Edit: this questions was flagged as a duplciate of two questions. One of the questons was answered with "use thread.Join()" and the other one was answered with "Use TPL" both answers are (in my opnion) clearly not answers to a question about pipelining and threading primitives such as CountDownEvent.

Roy T.
  • 9,429
  • 2
  • 48
  • 70
  • How do you process the work? Using threads, using tasks, ...? – Thomas Weller Aug 23 '20 at 12:23
  • So far I'm using manually created tasks. Its for a game engine so the threads live long and have to wake up quite fast so the thread pool or tasks (which indirectly use the thread pool) didn't seem like a good idea. – Roy T. Aug 23 '20 at 12:25
  • How are manually created tasks better than other tasks? Just because you create them manually will not make them better? Does it? You can't run more tasks in parallel than you have CPUs. – Thomas Weller Aug 23 '20 at 12:30
  • 6
    If you are using tasks, why not just use `Task.WhenAll`. For example call `await Task.WhenAll(stage1Tasks)` then start stage2 tasks when it is completed. – Sherif Elmetainy Aug 23 '20 at 12:42
  • I've updated my question. Sorry for the confusion but I did not mean Task<> but the English word task :). – Roy T. Aug 23 '20 at 19:40
  • 1
    I suggest to remove all instances of the word "tasks" from your question, because you are not referring to `Task` objects, and `Task`s are currently extremely prevalent as tools for solving problems related to asynchrony and parallelism. So talking about "tasks" without referring to `Task`s becomes a source of confusion. – Theodor Zoulias Aug 23 '20 at 22:23
  • 1
    I've tried to make it even clearer and replaced the disclaimer with removing all mentions of the word task and an explanation why Task<> would not be appropriate. But unfortunately the question has already been closed as a duplicate of, in my opinion, 2 very different questions. – Roy T. Aug 24 '20 at 11:01
  • 1
    I solved this question by creating my own version of a `CountDownEvent` unfortunately I can't post an answer... So I hope this link stays alive for a while: https://github.com/roy-t/EntitySystemTest/blob/master/Project/Threading/ThreadingPrimitive.cs – Roy T. Aug 25 '20 at 21:44
  • Yeap, it can be frustrating to have your question marked single-handedly as a duplicate of an irrelevant question by a privileged user, but this is how this site works and not much can be done about it. What you could do in anticipation of such an event is to write a dummy answer and delete it, so that you can edit it later and undelete it. Editing and undeleting answers of closed questions is allowed. – Theodor Zoulias Aug 27 '20 at 05:07
  • 1
    Yeah I even flagged it for moderator attention, no results though. That's a good trick to remember btw :). – Roy T. Aug 27 '20 at 09:37

1 Answers1

1

I think that the most suitable synchronization primitive for this case is the Barrier.

Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.

Usage example:

private Barrier _barrier = new Barrier(this.WorkersCount);

// Each worker thread runs the following code
for (i = 0; i < this.StagesCount; i++)
{
    // Here goes the work of a single worker for a single stage...
    _barrier.SignalAndWait();
}

Update: In case you want the workers to wait for the signal asynchronously, there is an AsyncBarrier implementation here.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104