4

TLDR: I set a flag in a callback and its value doesn't change in the main loop.

Unfortunately I have to make a simplified version of the code since the original is quite large, but here is the problem I am facing, in a nutshell:

try
    try
        exchangeBus.OnTradeEvent.AddHandler(tradeEventHandler)
        exchangeBus.OnOrderEvent.AddHandler(orderEventHandler)

        let mutable keepGoing = true
        while keepGoing do

            let nodeRunner = buildRunner()
            let waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset)

            // handle the completion event
            nodeRunner.OnCompletionEvent.Add(fun (completionType: CycleCompletionType) ->
                match completionType with
                | CodeException ex -> reportError side ex true
                                      keepGoing <- false
                                      abortHandle.Set() |> ignore

                | Abort reason     -> // do some stuff
                                      keepGoing <- false
                                      abortHandle.Set() |> ignore

                | AutoAbort reason -> // do some stuff
                                      waitHandle.Set() |> ignore

                | NormalClose      -> // do some stuff
                                      waitHandle.Set() |> ignore

            )

            // start the runner
            nodeRunner.Startup()

            // wait for completion, or abort
            WaitHandle.WaitAny([| waitHandle; abortHandle |]) |> ignore


    with ex ->
        status <- ExceptionState
        getRunner(side)          ||> (fun r -> r.Signal(CycleCompletionType.CodeException(ex)))
        getRunner(side.Opposite) ||> (fun r -> r.Signal(CycleCompletionType.Abort($"exception on the {side.Opposite} side, aborting")))

finally
    exchangeBus.OnTradeEvent.RemoveHandler(tradeEventHandler)
    exchangeBus.OnOrderEvent.RemoveHandler(orderEventHandler)        

so, in short here is how it works:

you have two objects containing this code running in parallel. They implement a loop where they create an object that holds a graph and executes a sequence of code based on nodes in that graph.

There can be an exception happening outside of the code executing the graph, in the callbacks tradeEventHandler and orderEventHandler, but this is handled separately.

once the core executing the graph (nodeRunner) is complete, it sends an event describing how it finished. There are 2 cases which are ok (Normal and AutoAbort) and 2 cases where the process needs to stop (CodeException and Abort).

There are two EventWaitHandle objects, one is unique for this object and is used to control the loop (waitHandle) and the other one is common to the two objects (abortHandle) and is used to tell both of them to stop their operations.

The issue happens when one of the object gets in to the abort state.

The OnCompletionEvent callback gets executed, then we have two things happening:

keepGoing <- false
abortHandle.Set() |> ignore

The abortHandle wait gets set and execution resumes after the WaitHandle.WaitAny call, as expected.

The keepGoing flag (which is local to that loop) gets set to false and... the loop goes on as if it was still true. When I print its state at the beginning of the loop, it is still showing true and not false.

So there must be a threading related issue. I don't know where the OnCompletionEvent call sets the keepGoing flag to false, but it's certainly not the same value as the one used in the loop.

How can I solve this? or, did I miss something obvious?

Thomas
  • 10,933
  • 14
  • 65
  • 136
  • 1
    You may need to use `Volatile` or `Interlocked` for the modifications to `keepGoing` to propagate across threads. Try defining `keepGoing` as a reference, i.e. `let keepGoing = ref true` and then do `Volatile.Write(keepGoing, false)` and `Volatile.Read(keepGoing)`. – dbc Jan 23 '21 at 18:53

1 Answers1

3

You may need to use Volatile or Interlocked for the modifications to keepGoing to be immediately visible across threads.

Since keepGoing is not a field you can't apply [<VolatileField>] so you could pass it to the Volatile methods by reference as follows:

let mutable keepGoing = true 
while (Volatile.Read(&keepGoing)) do
    // Do your processing
    // And eventually disable keepGoing
    Volatile.Write(&keepGoing, false)

Or define it as a reference cell and use it like so:

let keepGoing = ref true
while (Volatile.Read(keepGoing)) do
    // Do your processing
    // And eventually disable keepGoing
    Volatile.Write(keepGoing, false)

This can be necessary because, as explained in the remarks for Volatile:

On a multiprocessor system, a volatile write operation ensures that a value written to a memory location is immediately visible to all processors. A volatile read operation obtains the very latest value written to a memory location by any processor. These operations might require flushing processor caches, which can affect performance.

On a uniprocessor system, volatile reads and writes ensure that a value is read or written to memory and not cached (for example, in a processor register). Thus, you can use these operations to synchronize access to a field that can be updated by another thread or by hardware.

... Some languages, such as Visual Basic, do not recognize the concept of volatile memory operations. The Volatile class provides that functionality in such languages.

Thus it could be that when one of your threads modified the value of keepGoing, the others did not see the change in time because they had already fetched the value into a processor cache or register.

See also:

dbc
  • 104,963
  • 20
  • 228
  • 340
  • this worked, but the question is why? since it's a bool, the write is supposed to be atomic and I do the write before I set the wait object – Thomas Jan 23 '21 at 21:40
  • 2
    @Thomas - it's necessary because processors may have cached the value of `keepGoing` so writes from one processor may not get reflected in the cache of the second. Take a look at [When should the volatile keyword be used in C#?](https://stackoverflow.com/q/72275/3744182) and also the [docs](https://stackoverflow.com/q/72275/3744182). I updated my answer to include this explanation. – dbc Jan 23 '21 at 22:38
  • I think the issue may be more in the JIT compiler than CPU related: since I set the value, then I set the wait object and then the variable is checked, there is no concurrency during the execution. It is likely that the compiler is somehow not updating the value it has already read. I'd have to look at the IL to understand this and figure out if it's something in my code or if the compiler generates something erroneous (it reproduces 100%). – Thomas Jan 23 '21 at 22:52
  • 2
    Lippert explains it. Don't waste your time, except for curiosity's sake. – Bent Tranberg Jan 24 '21 at 08:30