7

EDIT/Notice: Event is now thread-safe in current F# implementation.


I'm working a lot with asynchronous workflows and agents in F#. While I was going a little bit deeper into events I noticed that the Event<_>() type is not thread-safe.

Here I'm not talking about the common problem of raising an event. I'm actually talking about subscribing and removing/disposing from an event. For testing purposes, I have written this short program:

let event = Event<int>()
let sub   = event.Publish

[<EntryPoint>]
let main argv =
    let subscribe sub x = async {
        let mutable disposables = []
        for i=0 to x do
            let dis = Observable.subscribe (fun x -> printf "%d" x) sub
            disposables <- dis :: disposables
        for dis in disposables do
            dis.Dispose()
    }

    Async.RunSynchronously(async{
        let! x = Async.StartChild (subscribe sub 1000)
        let! y = Async.StartChild (subscribe sub 1000)
        do! x
        do! y
        event.Trigger 1
        do! Async.Sleep 2000
    })
    0

The program is simple. I create an event and a function that subscribes a specific amount of events to it, and after that dispose every handler. I use another asynchronous computation to spawn two instances of those function with Async.StartChild. After both functions finished I trigger the event to see if there are some handlers still left.

But when event.Trigger(1) is called the result is that there are still some handlers registered to the event. As some "1" will be printed to the console. That in general means that subscribing and/or Disposing is not thread-safe.

And that is what I didn't expected. If subscribing and disposing is not thread-safe, how can events in general safely be used?

Sure events also can be used outside of threads, and a trigger don't spawn any function in parallel or on different threads. But it is somehow normal to me that events are used in Async, agent-based code or in general with threads. They are often used as a communication to gather information of Backroundworker threads.

With Async.AwaitEvent it is possible to subscribe to an event. If subscribing and disposing is not thread-safe, how is it possible to use events in such an environment? And which purpose has Async.AwaitEvent? Considering that an asynchronous workflow does thread, hoping just using Async.AwaitEvent is basically "broken by design" if subscribing/disposing to an event is not thread-safe by default.

The general question I'm facing is: Is it correct that subscribing and disposing is not thread-safe? From my example it seems to look like it, but probably I missed some important detail. I currently use events a lot in my design, and I usually have MailboxProcessors and use events for notification. So the question is. If events are not thread-safe the whole design I'm currently using is not thread-safe at all. So what is an fix for this situation? Creating a whole new thread-safe event implementation? Do some implementations already exist that face this problem? Or are there other options to use events safely in a highly threaded environment?

David Raab
  • 4,433
  • 22
  • 40

2 Answers2

5

FYI; the implementation for Event<int> can be found here.

The interesting bit seems to be:

member e.AddHandler(d) =
  x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>)
member e.RemoveHandler(d) = 
  x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>)

Subscribing to an event combines the current event handler with the event handler passed into subscribe. This combined event handler replaces the current one.

The problem from a concurrency perspective is that here we have a race-condition in that concurrent subscribers might use the came current event handler to combine with and the "last" one that writes back the handler win (last is a difficult concept in concurrency these days but nvm).

What could be done here is to introduce a CAS loop using Interlocked.CompareAndExchange but that adds performance overhead that hurts non-concurrent users. It's something one could make a PR off though and see if it viewed favourably by the F# community.

WRT to your second question on what to do about it I can just say what I would do. I would go for the option of creating a version of FSharpEvent that supports protected subscribe/unsubscribe. Perhaps base it of FSharpEvent if your company FOSS policy allows it. If it turns out a success then it could form a future PR to F# core libary.

I don't know your requirements but it's also possible that if what you need is coroutines (ie Async) and not threads then it's possible to rewrite the program to use only 1 thread and thus you won't be affected by this race condition.

  • As for your last sentence. Every async started with Async.Start runs on the thread-pool, but also using let! or do! can switch the async to another thread, for example just using "Async.Sleep" does switch to the Thread-pool or using Async.StartChild. On top i'm triggering event from MailboxProcessor and those also always run one some threads on the thread-pool. So i don't see how to make everything run on a single-thread. But i also wouldn't want this behaviour. I'm expecting this kind of behaviour as i don't want to run everything on a single-thread anyway. – David Raab Jan 31 '16 at 14:19
  • It seems that the lack of thread safety on events is a significant problem with the F# core library. Would you consider opening an issue for this? – Fyodor Soikin Jan 31 '16 at 17:25
  • You could roll your own coroutines or makes sure that each `let!` `do!` post back to the "main-thread". But as your said you don't want that behavior anyway so it doesn't matter. – Just another metaprogrammer Jan 31 '16 at 18:12
3

At first, thanks to FuleSnable for his answer. He pointed me in the right direction. Based on the information he provided I implemented a ConcurrentEvent type myself. This type uses Interlocked.CompareExchange for adding/removing its handlers so it is lock-free and hopefully the fastest way of doing it.

I started the implementation by copying the Event type from the F# Compiler. (I also leave the comment as-is.) The current implementation looks like this:

type ConcurrentEvent<'T> =
    val mutable multicast : Handler<'T>
    new() = { multicast = null }

    member x.Trigger(arg:'T) =
        match x.multicast with
        | null -> ()
        | d -> d.Invoke(null,arg) |> ignore
    member x.Publish =
        // Note, we implement each interface explicitly: this works around a bug in the CLR
        // implementation on CompactFramework 3.7, used on Windows Phone 7
        { new obj() with
            member x.ToString() = "<published event>"
          interface IEvent<'T>
          interface IDelegateEvent<Handler<'T>> with
            member e.AddHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
            member e.RemoveHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
          interface System.IObservable<'T> with
            member e.Subscribe(observer) =
                let h = new Handler<_>(fun sender args -> observer.OnNext(args))
                (e :?> IEvent<_,_>).AddHandler(h)
                { new System.IDisposable with
                    member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } }

Some notes on the design:

  • I started with a recursive loop. But doing that and looking at the compiled code it creates an anonymous class and calling AddHandler or RemoveHandler created an object of this. With direct implementation of the while loop it avoids instantiation of an object whenever a new handler is added/removed.
  • I explicitly used obj.ReferenceEquals to avoid a generic hash equality.

At least in my tests adding/removing a handler now seems to be thread-safe. ConcurrentEvent can just be swapped with the Event type as needed.


A benchmark if people are curious on how much slower the ConcurrentEvent will be compared to Event:

let stopWatch () = System.Diagnostics.Stopwatch.StartNew()

let event = Event<int>()
let sub   = event.Publish

let cevent = ConcurrentEvent<int>()
let csub   = cevent.Publish

let subscribe sub x = async {
    let mutable disposables = []
    for i=0 to x do
        let dis = Observable.subscribe (fun x -> printf "%d" x) sub
        disposables <- dis :: disposables
    for dis in disposables do
        dis.Dispose()
}

let sw = stopWatch()
Async.RunSynchronously(async{
    // Amount of tries
    let tries = 10000

    // benchmarking Event subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe sub tries)
    let! y = Async.StartChild (subscribe sub tries)
    do! x
    do! y
    sw.Stop()
    printfn "Event: %O" sw.Elapsed
    do! Async.Sleep 1000
    event.Trigger 1
    do! Async.Sleep 2000

    // Benchmarking ConcurrentEvent subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe csub tries)
    let! y = Async.StartChild (subscribe csub tries)
    do! x
    do! y
    sw.Stop()
    printfn "\nConcurrentEvent: %O" sw.Elapsed
    do! Async.Sleep 1000
    cevent.Trigger 1
    do! Async.Sleep 2000
})

On my system subscribing/unsubscribing 10,000 handlers with the non-thread-safe Event takes around 1.4 seconds to complete.

The thread-safe ConcurrentEvent takes around 1.8 seconds to complete. So I think the overhead is pretty low.

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
David Raab
  • 4,433
  • 22
  • 40
  • Something to consider: I am unsure if `CompareExchange` inserts a memory barrier (or what kind it is) or you have to do a manual memory barrier. – Just another metaprogrammer Jan 31 '16 at 18:20
  • Seems `CompareExchange` implies full barrier(http://stackoverflow.com/questions/1581718/does-interlocked-compareexchange-use-a-memory-barrier). AFAIK x86 reads uses `acquire` barrier so that should imply you are good on x86 but if you want the code to target ARM/PowerPC you might need to insert `Interlocked` calls when reading `x.multicast` – Just another metaprogrammer Jan 31 '16 at 18:26
  • @FuleSnabel i anyway made an error, as i used "x.multicast" in `Delegate.Combined` and `Delegate.Remove`. The operation should better work on the fetched `dels` variable. As far as i think, then i only need a single MemoryBarrier before reading from `x.multicast` to ensure the freshness of `x.multicast`. – David Raab Jan 31 '16 at 19:11
  • It's also that the data written by `Combine` has to visible before being read by another thread. – Just another metaprogrammer Jan 31 '16 at 21:01
  • @FuleSnabel `Combine` and `Remove` writes to a local variable. So a Memory Barrier should not be needed. I think you probably mean after `CompareExchange`? I still reading if a MemoryBarrier is really needed. – David Raab Jan 31 '16 at 21:39
  • `Delegate.Combine` allocates a new object, ultimately this is bytes in memory regardless of the abstractions we normally deal with. The written bytes has to be committed before the `CompareExchange` completes. The implicit memory barrier should ensure that. But also that the thread that reads the memory can't schedule the reading of bytes before reading the address. Now thinking of it more closely that should be ok in this case as this is an address and the reads of the bytes would be dependent on that address. So it should be ok in this case. If it was a flag it might have been different. – Just another metaprogrammer Feb 01 '16 at 12:50
  • @FuleSnabel You don't need Memorybarrier for thread-local variables. The point of a memory barrier is that pending reads/writes on a variable are commited so other threads that also access those variable see the most recent value. Variables that are not accessed from multiple different threads don't need memory barriers. Otherwise we would even need MemoryBarriers in single-threaded code. – David Raab Feb 01 '16 at 13:59
  • StackOverflow is complaining about us having a discussion. The state of Delegate object isn't thread-local. That is shared between threads. But never mind, according to StackOverflow discussing like this is bad. – Just another metaprogrammer Feb 01 '16 at 17:32
  • @FuleSnabel Delegate is an immutable object. `let dels = x.multicast` creates a thread-local copy. To ensure the freshnes of `x.multicast` i added a MemoryBarrier before the assignment. After that `Combine` and `Remove` working on a thread-local copy of `x.multicast`. The only important thing is, if a MemoryBarrier after `CompareExchange` is needed. Becaus only CompareExcahnge will change x.multicast. But as far as i found the CompareExchange should emit MemoryBarriers and also Mono emits explicit Memorybarriers after compareexchange on other systems than windows. – David Raab Feb 01 '16 at 20:14