2

I am reading data from a ZeroMQ socket using pyzmq. The code also uses websockets so chose to use asyncio for ZeroMQ on the subscriber side, too.

The subscriber has to read a multipart message. Instead of using socket.recv_multipart() I chose to make use of pyzmq's convenience functions for receiving and sending

import zmq
import zmq.asyncio         
        
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)

address = await socket.recv_string()
print(socket.RCVMORE)
clock = await socket.recv_pyobj()
print(socket.RCVMORE)
data = await socket.recv_pyobj()

The publisher also uses the convenience functions, does not use asyncio:

socket.send_string('raw', zmq.SNDMORE)
socket.send_pyobj(time.time(), zmq.SNDMORE)
socket.send_pyobj(d.data)

Usually the code works fine. But sometimes it failes on the subscriber side with exceptions like

EOFError: Ran out of input

or

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

I am quite sure this has to do with the piecewise reading in combination with asyncio and could be solved by using await socket.recv_multipart(). But I am trying to get a deeper understanding of the ZeroMQ protocol and asyncio and would like to know why exactly it could fail.

Any ideas?

The only thread that might be related to this does not provide a good answer on why this could happen.

Edit: I came across this in the docs, but not sure it is related:

Multi-part messages

A ØMQ message is composed of 1 or more message parts. Each message part is an independent zmq_msg_t in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all.

Community
  • 1
  • 1
Joe
  • 6,758
  • 2
  • 26
  • 47

0 Answers0