20

I read the ZeroMq guide and I stumbled upon the following:

You MUST NOT share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

and later on:

Remember: Do not use or close sockets except in the thread that created them.

I also understood that the ZeroMQ Context is threadsafe.

If a class registers for an event of a another class, in .Net, this event might be invoked from a different thread than the thread the listener was created on.

I think there are only two options to be able to dispatch something via ZeroMQ-Sockets from within an eventhandler:

  • Synchronize the eventhandler-invoking-thread to the thread the ZeroMQ-Socket was created in
  • Create a new ZeroMQ-Socket / get the exisiting ZeroMQ-Socket for the thread within the eventhandler by using the threadsafe ZeroMQ-Context

It seems that the 0MQ-Guide to discourage the first one and I don't think that creating a new ZeroMq-Socket for each thread is performant / the way to go.

My Question:
What is the correct pattern (the way it is meant to be) to publish messages via 0MQ from within an eventhandler?

Also, did the authors of the guide have the ZeroMQ-Binding for .Net in mind when they wrote:

The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets. ?

Here is some samplecode to emphasize my problem/question:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
tobsen
  • 5,328
  • 3
  • 34
  • 51

3 Answers3

27

You can create lots of 0MQ sockets, certainly as many as you have threads. If you create a socket in one thread, and use it in another, you must execute a full memory barrier between the two operations. Anything else will result in weird random failures in libzmq, as socket objects are not threadsafe.

There are a few conventional patterns, though I don't know how these map specifically to .NET:

  1. Create sockets in the threads that use them, period. Share contexts between threads that are tightly bound into one process, and create separate contents in threads that are not tightly bound. In the high-level C API (czmq) these are called attached and detached threads.
  2. Create a socket in a parent thread and pass at thread creation time to an attached thread. The thread creation call will execute a full memory barrier. From then on, use the socket only in the child thread. "use" means recv, send, setsockopt, getsockopt, and close.
  3. Create a socket in one thread, and use in another, executing your own full memory barrier between each use. This is extremely delicate and if you don't know what a "full memory barrier" is, you should not be doing this.
Pieter Hintjens
  • 316
  • 2
  • 2
  • 4
    In .NET, there are a lot of operations that involve the thread pool (e.g. Task) where an operation gets run on the next available thread; you don't know which thread it will be. – Jeramy Rutley Nov 04 '12 at 15:50
  • There are several spots where full memory barriers are used implicitly, but I suspect that testing this code would be extremely tricky. I just read the quote in the question this morning and wanted to see if anyone solved this. – Jeramy Rutley Nov 04 '12 at 16:04
  • How about the context? Is it recommended to share it between threads or create contexts for different threads? – liang Mar 07 '15 at 02:44
  • 2
    @Pieter, Rest in Peace – MaYaN Oct 07 '16 at 17:49
7

In .net framework v4 and up you can use concurrent collection to solve this problem. Namely Producer-Consumer pattern. Multiple threads (handlers) can send data to a thread-safe queue and just single thread consumes data from the queue and sends it using the socket.

Here is the idea:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
Vadym Chekan
  • 4,977
  • 2
  • 31
  • 24
  • I used this approach lately and in my opinion this is an idiomatic way to handle this "send only from one thread" situation in .Net4. However, it does not solve those cases where one might want to have multiple senders and I am currently experimenting with the [ThreadStatic] attribute and some worker threads. The only issue I currently have is the proper closing of the socket once they are no longer needed. – tobsen Feb 11 '13 at 15:32
  • tobsen: why can't you have multiple senders putting objects int the queue? That's what I've done in the past. – bj0 May 28 '13 at 18:05
  • @tobsen, the example above is explicitly for multiple concurrent senders. The comment in the code explicitly says "concurrent queue can accept from multiple threads/handlers safely". What do you mean "it does not solve those cases where one might want to have multiple senders", because the sole purpose of the code above is to do exactly that? – Vadym Chekan May 28 '13 at 18:19
  • @bj0 in my comment "senders" means "consumers of the queue who than send via 0mq". In other words: When I wrote the comment, I was aware that I can have multiple producers from different threads but. However, I didn't know back then that I can have multiple consumers of the BlockingCollection too. – tobsen May 28 '13 at 18:31
  • Why I didn't accept this answer yet: This solution requires .Net 4 which has a lock free collection. I am still wondering which pattern to use in earlier .Net versions and still avoid "semaphores, locks, or mutexes" which, according to the ZMQguide "make your application slow and fragile". – tobsen May 28 '13 at 18:41
  • 1
    @tobsen, what ZMQGuide is referring to is "naive" locks, when consumer locks queue for long time which cause sender threads to block and whole app stops because sending is in progress. If you can't use framework 4, you may still use producer/consumer approach but your code will be more complicated. – Vadym Chekan May 28 '13 at 19:03
  • I am currently using a producer/consumer pattern featuring `lock` to synchronize access to a queue. I felt bad due to the strong statement in the zmq guide. You basically tell me I don't have to feel bad. Here, have an upvote ;) – tobsen May 28 '13 at 19:19
3

Don't forget to have a look at the inproc transport. It might be useful to use inproc:// sockets for interthread communication and have one thread that opens sockets to talk to other processes/servers.

You still need at least one socket per thread, but the inproc ones do not involve the IP network layer at all.

Michael Dillon
  • 31,973
  • 6
  • 70
  • 106
  • Michael, if a socket is connected on `inproc`, can I reuse it across different threads without explicit synchronization, or should `inproc` sockets be handled with the same care as sockets connected on `tcp`, `pgm`, etc? – raffian Jun 18 '13 at 16:53
  • Does the answer to this question answer yours? http://stackoverflow.com/questions/5476308/inter-thread-comunication-using-zeromq-messages – Michael Dillon Jun 20 '13 at 01:05
  • Not really, I want to know if an `inproc` socket can be shared amongst threads; I know regular zqm sockets are definitely not thread safe, but thanks anyway. – raffian Jun 20 '13 at 01:46