3

I have set up two small scripts imitating a publish and subscribe procedure with pyzmq. However, I am unable to send messages over to my subscriber client using the inproc transport. I am able to use tcp://127.0.0.1:8080 fine, just not inproc.

pub_server.py

import zmq
import random
import sys
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("inproc://stream")

while True:
    socket.send_string("Hello")
    time.sleep(1)

sub_client.py

import sys
import zmq

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
socket.connect("inproc://stream")

for x in range (5):
    string = socket.recv()
    print(string)

How can I successfully alter my code so that I'm able to use the inproc transport method between my two scripts?

EDIT:

I have updated my code to further reflect @larsks comment. I am still not receiving my published string - what is it that I am doing wrong?

import threading
import zmq

def pub():
    context = zmq.Context()
    sender = context.socket(zmq.PUB)
    sender.connect("inproc://hello")
    lock = threading.RLock()

    with lock:
        sender.send(b"")

def sub():
    context = zmq.Context()
    receiver = context.socket(zmq.SUB)
    receiver.bind("inproc://hello")

    pub()

    # Wait for signal
    string = receiver.recv()
    print(string)
    print("Test successful!")

    receiver.close()

if __name__ == "__main__":
    sub()
juiceb0xk
  • 949
  • 3
  • 19
  • 46

2 Answers2

8

As the name implies, inproc sockets can only be used within the same process. If you were to rewrite your client and server such that there were two threads in the same process you could use inproc, but otherwise this socket type simply isn't suitable for what you're doing.

The documentation is very clear on this point:

The in-process transport passes messages via memory directly between threads sharing a single ØMQ context.

Update

Taking a look at the updated code, the problem that stands out first is that while the documentation quoted above says "...between threads sharing a single ØMQ context", you are creating two contexts in your code. Typically, you will only call zmq.Context() once in your program.

Next, you are never subscribing your subscriber to any messages, so even in the event that everything else was working correctly you would not actually receive any messages.

Lastly, your code is going to experience the slow joiner problem:

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

The pub/sub model isn't meant for single messages, nor is it meant to be a reliable transport.

So, to sum up:

  • You need to create a shared ZMQ context before you creating your sockets.
  • You probably want your publisher to publish in a loop instead of publishing a single message. Since you're trying to use inproc sockets you're going to need to put your two functions into separate threads.
  • You need to set a subscription filter in order to receive messages.

There is an example using PAIR sockets in the ZMQ documentation that might provide a useful starting point. PAIR sockets are designed for coordinating threads over inproc sockets, and unlike pub/sub sockets they are bidirectional and are not impacted by the "slow joiner" issue.

larsks
  • 277,717
  • 41
  • 399
  • 399
  • Thanks @larsks. However, I have edited my question and included my altered script in conjunction with your answer, but I am still not receiving anything that I publish. What is it that I may be doing wrong? – juiceb0xk Feb 03 '18 at 11:10
  • I've made an update that I hope points you in the right direction. – larsks Feb 03 '18 at 12:28
  • Thank you. what if the pub and sub are on different machines please? Also, what are some useful use cases for using pub-sub in the same machine over inproc please? Mostly, we use pub-sub over network to let clients receive updates from publishers. What is the point from this local pub-sub in the same machine? – Avv Oct 11 '22 at 01:02
  • I am not sure if we need a subscription filter in order to receive messages? We can just subscribe to with `""` as I can tell so far. – Avv Oct 11 '22 at 01:18
  • When you 'just subscribe to ""' you are setting a subscription filter (for "all messages"). – larsks Oct 11 '22 at 01:48
0

As mention earlier by @larsks, the context object should be the same. Declare the context object globally and use it in both pub and sub functions instead of creating new ones for each.

user52610
  • 11
  • 1
  • 4