1

I'm making examples for my ZeroMQ CLR namespace, however I have a problem with PUB/SUB.

Why do I get only the first message? Sometimes I get no message, if I debug through the client (on PubSub_Client(arg);) I get some messages.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Security.Cryptography;

using ZeroMQ;

namespace ZeroMQ.Test
{
    static partial class Program
    {
        static string PubSub_FrontendAddress = "tcp://127.0.0.1:2772";

        public static void Main(string[] args)
        {
            if (args == null || args.Length < 1)
            {
                // say here were some arguments...
                args = new string[] { "World" };
            }

            // Setup the ZContext
            context = ZContext.Create();

            CancellationTokenSource cancellor0 = null;

            {
                // Create the "Server" cancellor and threads
                cancellor0 = new CancellationTokenSource();

                var serverThread = new Thread(PubSub_Server);
                serverThread.Start(cancellor0.Token);
                serverThread.Join(64);
            }

            {
                Thread.Sleep(1000);
                Console.WriteLine("Starting...");

                // foreach arg we are the Client, asking the Server
                foreach (string arg in args)
                {
                    PubSub_Client(arg);
                    // Thread.Sleep(1000);
                }

                Console.WriteLine("Ended...");
            }

            if (cancellor0 != null)
            {
                // Cancel the Server
                cancellor0.Cancel();
            }

            // we could have done here context.Terminate()

        }
        static void PubSub_Server(object cancelluS)
        {
            var cancellus = (CancellationToken)cancelluS;

            using (var socket = ZSocket.Create(context, ZSocketType.SUB))
            {
                socket.Bind(PubSub_FrontendAddress);

                socket.SubscribeAll();

                /* var poller = ZPollItem.Create(socket, (ZSocket _socket, out ZMessage message, out ZError _error) =>
                {

                    while (null == (message = _socket.ReceiveMessage(/* ZSocketFlags.DontWait, * out _error)))
                    {
                        if (_error == ZError.EAGAIN)
                        {
                            _error = ZError.None;
                            Thread.Sleep(1);

                            continue;
                        }

                        throw new ZException(_error);
                    }

                    return true;
                }); /**/

                while (!cancellus.IsCancellationRequested)
                {
                    ZError error;
                    ZMessage request;
                    /* if (!poller.TryPollIn(out request, out error, TimeSpan.FromMilliseconds(512)))
                    {
                        if (error == ZError.EAGAIN)
                        {
                            error = ZError.None;
                            Thread.Sleep(1);

                            continue;
                        }

                        throw new ZException(error);
                    } /**/

                    if (null == (request = socket.ReceiveMessage(ZSocketFlags.DontWait, out error)))
                    {
                        if (error == ZError.EAGAIN)
                        {
                            error = ZError.None;
                            Thread.Sleep(1);

                            continue;
                        }

                        throw new ZException(error);
                    } /**/

                    foreach (ZFrame frame in request)
                    {
                        string strg = frame.ReadString();
                        Console.WriteLine("{0} said hello!", strg);
                    }
                }

                socket.Unbind(PubSub_FrontendAddress);
            }
        }
        static void PubSub_Client(string name)
        {
            using (var socket = ZSocket.Create(context, ZSocketType.PUB))
            {
                using (var crypto = new RNGCryptoServiceProvider())
                {
                    var identity = new byte[8];
                    crypto.GetBytes(identity);
                    socket.Identity = identity;
                }

                socket.Connect(PubSub_FrontendAddress);

                using (var request = new ZMessage())
                {
                    request.Add(new ZFrame(name));

                    socket.Send(request);
                }

                socket.Disconnect(PubSub_FrontendAddress);
            }
        }
    }
}
metadings
  • 3,798
  • 2
  • 28
  • 37
  • Are you sure the server method isn't throwing an exception? Because this is in a different thread, it will not be caught in the main thread and so just be thrown silently. – Rhumborl Jan 02 '15 at 14:54
  • @Rhumborl No, this doesn't throw exceptions. – metadings Jan 02 '15 at 14:56
  • I just get one message. If I restart the client (not the server) it's getting the message again, however not the second or third one. – metadings Jan 02 '15 at 18:13
  • why are you using [NetMQ](https://github.com/zeromq/netmq)? CLRZMQ is kind of dead for zeromq and NetMQ is 100% c# – somdoron Jan 03 '15 at 08:27
  • @somdoron clrzmq is live again, with C, C#, PGM, CURVE et al – metadings Jan 03 '15 at 14:12

2 Answers2

1

I'm having trouble with your design which seems just wrong:

  • A single subscriber and multiple publishers is an odd choice. I trust you have a good reason for it, but you should have said what that is. When sending messages from multiple clients to a single server, it is normal to use DEALER/ROUTER sockets instead. PUB/SUB is intended for a small set of publishers to a large number of subscribers.

  • A client that connects, sends one message, then immediately disconnects, is another very unusual use case that I hope is just an example:

    • For one thing, you are open to linger problems whereby the message will get dropped on the disconnect it is isn't sent within the linger timeout. [I don't know what the default linger is for your language binding, so that may or may not be an issue, but you should at least check to ensure that it isn't.]
    • For another, as you've already found, there are issues around the time it takes to connect to a socket, which may lead to PUB messages getting dropped if they are sent before the socket has properly connected.

If you insist on using PUB/SUB in this manner, you will need an out of band protocol to synchronise the PUB and SUB threads before the pub messages are sent. There are examples of how to do this reliable pub/sub in the zeromq guide. This will involve a second set of sockets in the same threads to send the synchronisation messages; DEALER sockets don't drop messages which is why they are suitable for that purpose...

But, DEALER/ROUTER sockets would appear to be a better choice than PUB/SUB unless there is some design requirement that hasn't been disclosed.

John Jefferies
  • 1,176
  • 7
  • 13
  • It is becoming an "example for my ZeroMQ CLR namespace". "Which may lead to PUB messages getting dropped if they are sent before the socket has properly connected." - But why is this so, if I do it without DONTWAIT? – metadings Jan 03 '15 at 21:21
  • 1
    PUB sockets don't honour DONTWAIT. See the api manual at http://api.zeromq.org/4-1:zmq-send; only DEALER & PUSH sockets are listed as honouring that option. PUB sockets never block when sending a message, i.e. always behave as though DONTWAIT was on. – John Jefferies Jan 04 '15 at 10:15
  • Hey, if you're interested: I made clrzmq4 and [the Guide examples](https://github.com/metadings/zguide/tree/master/examples/C%23). – metadings Feb 03 '15 at 13:16
0

Well... There was a comment by Martin Sustrik: "The problem is that connecting is asynchronous and takes certain amount of time."

Now there is Thread.Sleep(64) - and it works...:

    static void PubSub_Client(string name)
    {
        using (var socket = ZSocket.Create(context, ZSocketType.PUB))
        {
            socket.Connect(PubSub_FrontendAddress);

            Thread.Sleep(64);

            using (var request = new ZMessage())
            {
                request.Add(new ZFrame(name));

                socket.Send(request);
            }

            socket.Disconnect(PubSub_FrontendAddress);
        }
    }

Do you know any better way to get the connection established?

metadings
  • 3,798
  • 2
  • 28
  • 37
  • I don't personally but a quick search brought up [this question and answers](http://stackoverflow.com/q/22715840/1901857) which may be of use – Rhumborl Jan 02 '15 at 18:34