2

I have created simple client/server using pyzmq.

One thing I am not sure is the .recv() does not receive the message even though it has been sent from server. It just ignores it and throws an error which I find to be strange.

Client.py

try:
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:2222")
    print("Sending request")
    socket.send(b"send the message")
    message = socket.recv(flags=zmq.NOBLOCK)
    print("Received reply %s " % message)
except Exception as e:
    print(str(e))

Server.py

 context = zmq.Context()
 socket = context.socket(zmq.REP)
 socket.bind("tcp://*:2222")
 while True:
     message = socket.recv()
     socket.send(b"Ack")

I think the client should receive the Ack and print it instead of throwing the exception.

The document says,

With flags=NOBLOCK, this raises ZMQError if no messages have arrived

Clearly the server is responding with "Ack" as soon as it receives the message.

The Error message is,

Resource temporarily unavailable

user3666197
  • 1
  • 6
  • 50
  • 92
Shivaraj
  • 400
  • 5
  • 16

3 Answers3

4

Remember that in concurrent environments there are no guarantees about the order of execution of independent processes. Even though you are responding immediately to the message in the server.py, the response may not get to the receiving socket before you call socket.recv. When you call socket.send the message needs to go over the network to your server, the server needs to create the message and respond and then the message needs to go back over the network to your client code. The time to send the messages over the network will be quite long and you are calling socket.recv immediately after socket.send.

So in fact when you call message = socket.recv(flags=zmq.NOBLOCK) the client socket will not have received the Ack from the server yet, and since you are using NOBLOCK an error is thrown since no messages have been received on the socket.

NOBLOCK is likely not appropriate in this scenario. You can experiment with this by adding a sleep call in between send and recv to show that the time delay of waiting for the response from the server is indeed the issue but that's not a good solution for your client code long-term.

If you want to exit after waiting for a certain amount of time you should use socket.poll instead.

event = socket.poll(timeout=3000)  # wait 3 seconds
if event == 0:
    # timeout reached before any events were queued
    pass
else:
    # events queued within our time limit
    msg = socket.recv()

Pyzmq Doc for socket.poll()

Bobs Burgers
  • 761
  • 1
  • 5
  • 26
azundo
  • 5,902
  • 1
  • 14
  • 21
  • so in what case this "With flags=NOBLOCK, this raises ZMQError if no messages have arrived" is applicable then. In real time there is always some delay in the network so I am quite not sure what's the use of this flag then – Shivaraj Aug 16 '19 at 19:52
  • See my update about using`poll` if you want to provide a timeout – azundo Aug 16 '19 at 21:13
  • @azundo With all due respect, how did you arrive to a conclusion that a call to the `socket.recv( zmq.NOBLOCK )` method ***"is likely not appropriate in this scenario"* ?** Having spent 10+ years with ZeroMQ based distributed-computing I hear such opinion for the first time. What are your arguments for such an opinion, as expressed above? Did you retest the MCVE and have you reproduced all the claimed exceptions, that have been posted by O/P, in person? What has driven you to an opinion and what shall be the steps ( if any ) to achieve an appropriate solution of this scenario? Deeply surprised – user3666197 Aug 16 '19 at 21:43
1

Q : say the server is not up in that case the recv() in the client will be blocked forever which I don't want.

ZeroMQ is a fabulous framework for doing smart signaling/messaging in distributed-systems

Let's sketch a demo of a principally non-blocking modus-operandi, with some inspirations of how the resources ought be both acquired and also gracefully released before process termination.

Maybe a bit of reading about the main conceptual differences in ZeroMQ hierarchy in less than a five seconds will also help.

Server.py

 aContext     = zmq.Context()
 aLightHouse  =    aContext.socket( zmq.PUB )
 aRepSocket   =    aContext.socket( zmq.REP )
 aRepSocket.setsockopt(             zmq.LINGER,   0 )
 aRepSocket.setsockopt(             zmq.COMPLETE, 1 )
 aRepSocket.bind(                  "tcp://*:2222" )
 aLightHouse.bind(                 "tcp://*:3333" )
 aLightHouse.setsockopt(            zmq.LINGER,   0 )
 aLightHouse.setsockopt(            zmq.CONFLATE, 1 )
 aLightHouse_counter = 0
 #------------------------------------------------------------
 print( "INF: Server InS: ZeroMQ({0:}) going RTO:".format( zmq.zmq_version() )  )
 #------------------------------------------------------------
 while True:
    try:
        aLightHouse_counter += 1
        aLightHouse.send( "INF: server-RTO blink {0:}".format( repr( aLightHouse_counter ) ),
                           zmq.NOBLOCK
                           )
        if ( 0 < aRepSocket.poll( 0, zmq.POLLIN ) ):
            try:
                message = aRepSocket.recv(         zmq.NOBLOCK ); print( "INF: .recv()ed {0:}".format( message ) )
                pass;     aRepSocket.send( b"Ack", zmq.NOBLOCK ); print( "INF: .sent() ACK" )
            except:
                # handle EXC: based on ...
                print(  "EXC: reported as Errno == {0:}".format( zmq.zmq_errno() ) )
        else:
            # NOP / Sleep / do other system work-units to get processed during the infinite-loop
    except:
        # handle EXC:
        print(  "EXC: will break ... and terminate OoS ..." )
        break
#------------------------------------------------------------
print( "INF: will soft-SIG Server going-OoS..." )
aLightHouse.send(   "INF: server goes OoS ... " )
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
Sleep( 0.987654321 )
aRepSocket.unbind(  "tcp://*:2222" )
aRepSocket.close()
aLightHouse.unbind( "tcp://*:3333" )
aLightHouse.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )

Client.py

try:
  aContext   = zmq.Context()
  aReqSocket =    aContext.socket( zmq.REQ )
  aBeeper    =    aContext.socket( zmq.SUB )
  aReqSocket.setsockopt(           zmq.LINGER,   0 )
  aReqSocket.setsockopt(           zmq.COMPLETE, 1 )
  aReqSocket.connect(             "tcp://localhost:2222" )
  aBeeper.connect(                "tcp://localhost:3333" )
  aBeeper.setsockopt(              zmq.SUBSCRIBE, "" )
  aBeeper.setsockopt(              zmq.CONFLATE, 1 )
  #------------------------------------------------------------
  print( "INF: Client InS: ZeroMQ({0:}) going RTO.".format( zmq.zmq_version() )  )
  #------------------------------------------------------------
  try:
      while True:
           if ( 0 == aBeeper.poll( 1234 ) ):
                print( "INF: Server OoS or no beep visible within a LoS for the last 1234 [ms] ... " )
           else:
                print( "INF: Server InS-beep[{0:}]".format( aBeeper.recv( zmq.NOBLOCK ) ) )
                try:
                     print( "INF: Going to sending a request" )
                     aReqSocket.send( b"send the message", zmq.NOBLOCK )
                     print( "INF: Sent. Going to poll for a response to arrive..." )
                     while ( 0 == aReqSocket.poll( 123, zmq.POLLIN ) ):
                           print( "INF:  .poll( 123 ) = 0, will wait longer ... " )
                     message = socket.recv( flags = zmq.NOBLOCK )
                     print( "INF: Received a reply %s " % message )
                
                 except Exception as e:
                     print( "EXC: {0:}".format( str( e ) ) )
                     print( "INF: ZeroMQ Errno == {0:}".format( zmq.zmq_errno() ) )
                     print( "INF: will break and terminate" )
                     break
  except Exception as e:
      print( "EXC: {0:}".format( str( e ) ) )
  finally:
      #------------------------------------------------------------
      print( "INF: will .close() and .term() resources on clean & graceful exit..." )
      aBeeper.close()
      aReqSocket.close()
      aContext.term()
      #------------------------------------------------------------
      print( "INF: over and out" )
halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
0

you are using non-blocking mode which mean it will raise an error to inform you, that there's nothing that could be done with the message and you should try again later, but if you are using blocking mode it blocks until the peers connects.

this answer is from here

basically if you remove flags=zmq.NOBLOCK it will work.

Update

if you want to use non-blocking mode you should have a look to this

Omar Al-Howeiti
  • 1,227
  • 7
  • 19
  • Yes, if I remove flags=zmq.NOBLOCK, it will work provided the server is up and running. say the server is not up in that case the recv() in the client will be blocked forever which I don't want. But the same time I want to see if its received – Shivaraj Aug 16 '19 at 19:48
  • I'm not sure what you want exactly, but if you are using blocking mode you can check if the server script running or not before sending the message to avoid blocking the message forever. – Omar Al-Howeiti Aug 17 '19 at 08:37