0

Is it possible to have a publisher and subscriber in one thread? Whenever I do

socket_send = context.socket(zmq.PUB)
socket_send.bind("tcp://127.0.0.1:5559")

socket_recv = context.socket(zmq.SUB)
socket_recv.connect("tcp://127.0.0.1:5559")
socket_recv.setsockopt(zmq.SUBSCRIBE, "id1")

The subscription does not work (i.e. message don't arrive). When I used socket_recv.bind() the sending does not work (using bind() on socket_send and socket_recv causes an address-already-used error).

Any idea of how I can resolve this? I have a multiple clients writing messages to the pub-sub message bus, then I used the ventilator example to distribute the messages to workers and these workers write back to the ventilator which sends the results back to the clients (worker - ventilator communication is a different communication). Perhaps there's a better communication pattern to handle this...

orange
  • 7,755
  • 14
  • 75
  • 139
  • You almost certainly do not really want to do this. Can you clarify a few things? You have a subscriber listening for clients to publish on their topic? The receive the message and distribute to workers, then you want to send the results back to the client via publish? And the client picks this up because they are also subscribed to the results? – sberry Dec 24 '14 at 01:45
  • 1
    From what I gather, you most likely want a Router that clients connect to and send to. Then, using a Poller, you read from the Router and write to a Dealer. Each of your workers have a reference to that Dealer sock, read their jobs, do the work, write back on the Dealer, then in your main thread (in the Poller) you read from the Dealer and write back on the Router. Just be aware that Routers and Dealers will have multiframe messages (your target data will be in the last frame) because it uses the first frames as routing information. – sberry Dec 24 '14 at 01:48
  • I think your description matches my use case. However, looking at [this example](http://zguide.zeromq.org/py:rtdealer), I have a hard time figuring out which part of the communication they describe (the client to ventilator or ventilator to workers?). Also, the fact that the workers are identified is a bit strange. – orange Dec 24 '14 at 04:23
  • Yes you can send and receive messages on the same thread, but why would you want to when normal python variables would do the job? I can't say why you aren't receiving messages because you haven't posted enough code. You're design is almost certainly wrong though; it would help us to to help you if you can state what you're trying to do in purely functional terms, rather than stating it in terms of probably inappropriate patterns. – John Jefferies Dec 24 '14 at 12:27
  • @JohnJefferies You seem to have misunderstood something. Obviously, I'm not trying to send data within one thread. I was asking whether the same thread can send and receive on the same pub-sub message bus - that's not the same. – orange Dec 26 '14 at 08:48
  • Thanks @sberry's - your comments were very helpful. I looked into the dealer-router pattern. – orange Dec 26 '14 at 08:48

1 Answers1

0

You almost always want to run a ready-to-go example of the pattern you want to use first just to confirm everything seems to be in working order. Unfortunately I don't see any ready made examples in pyzmq (which is, I assume, the binding you're using) with pub/sub both in the same thread, but I have seen and run such examples in other languages so it's not a limitation of ZMQ and should be possible in your situation.

There are a couple of things you'll want to look at. Your code sample is very sparse, there's no way for anyone to diagnose what's going on from that, but here's some suggestions:

  • Before trying to subscribe to a specific topic (in your case, "id1"), try subscribing to everything: socket_recv.setsockopt(zmq.SUBSCRIBE, "") - this will remove the possibility that you're not setting up the subscription properly.
  • Along the same lines, when you do subscribe to "id1", make sure your message is either a single frame message that begins with the string "id1", or it's a multi-frame message with "id1" as the first frame.
  • I assume all of this is being run in a synchronous context, which means your subscriber should finish connecting before you move on to the next line, but just make sure that's true... if you should start publishing your message before your subscriber is finished connecting, that message will be lost.

As you note, you can't bind() twice on the same address, something useful to keep in mind. You want to think of one side of the socket pair as a "server" (which really means the constant element) and the other side as a "client" (which really means the unreliable element)... if they both are just as constant and both as reliable, pick the one that "owns" or "originates" the data (in pub/sub, this would always be the publisher) and mark that one the "server"... you want to bind() on your server, and connect() on your client.

All that said... as sberry noted, your proposed use case is bi-directional communication, which doesn't seem to fit pub/sub. There are many examples of doing what you want to do in the guide, specifically look at reliable request/reply patterns. Your use case is similar enough that you'll probably want to use one of those as a base, and there is python code linked throughout the descriptions of those patterns that will help you understand which code is doing what.

Jason
  • 13,606
  • 2
  • 29
  • 40
  • It took me a while to get back to this topic. Your suggestion of request/reply pattern works fine, but how does the server in this example (http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive) know which client to send to (http://zguide.zeromq.org/py:hwserver)? – orange Apr 08 '15 at 23:28
  • 1
    REQ/REP is fairly limited in what it can handle. When a client sends a REQ message to a REP server, that server's next message *must* be a reply back to that same client. There is no other option. ZMQ internally keeps track of which client sent the last request, and then replies to it, releasing it to receive a new request from a new client ([see this answer for more details](http://stackoverflow.com/a/13136707/545332)). – Jason Apr 09 '15 at 13:24
  • Thanks for the pointer. This example (http://zguide.zeromq.org/py%3aasyncsrv) works well. – orange Apr 16 '15 at 04:34