2

I created a zmq_forwarder.py that's run separately and it passes messages from the app to a sockJS connection, and i'm currently working on right now on how a flask app could receive a message from sockJS via zmq. i'm pasting the contents of my zmq_forwarder.py. im new to ZMQ and i dont know why everytime i run it, it uses 100% CPU load.

import zmq

# Prepare our context and sockets
context = zmq.Context()

receiver_from_server = context.socket(zmq.PULL)
receiver_from_server.bind("tcp://*:5561")

forwarder_to_server = context.socket(zmq.PUSH)
forwarder_to_server.bind("tcp://*:5562")

receiver_from_websocket = context.socket(zmq.PULL)
receiver_from_websocket.bind("tcp://*:5563")

forwarder_to_websocket = context.socket(zmq.PUSH)
forwarder_to_websocket.bind("tcp://*:5564")

# Process messages from both sockets
# We prioritize traffic from the server
while True:

    # forward messages from the server
    while True:
        try:
            message = receiver_from_server.recv(zmq.DONTWAIT)
        except zmq.Again:
            break

        print "Received from server: ", message
        forwarder_to_websocket.send_string(message)

    # forward messages from the websocket
    while True:
        try:
            message = receiver_from_websocket.recv(zmq.DONTWAIT)
        except zmq.Again:
            break

        print "Received from websocket: ", message
        forwarder_to_server.send_string(message)

as you can see, i've setup 4 sockets. the app connects to port 5561 to push data to zmq, and port 5562 to receive from zmq (although im still figuring out how to actually set it up to listen for messages sent by zmq). on the other hand, sockjs receives data from zmq on port 5564 and sends data to it on port 5563.

i've read the zmq.DONTWAIT makes receiving of message asynchronous and non-blocking so i added it.

is there a way to improve the code so that i dont overload the CPU? the goal is to be able to pass messages between the flask app and the websocket using zmq.

Bakuriu
  • 98,325
  • 22
  • 197
  • 231
bonbon.langes
  • 1,718
  • 2
  • 22
  • 37

1 Answers1

6

You are polling your two receiver sockets in a tight loop, without any blocking (zmq.DONTWAIT), which will inevitably max out the CPU.

Note that there is some support in ZMQ for polling multiple sockets in a single thread - see this answer. I think you can adjust the timeout in poller.poll(millis) so that your code only uses lots of CPU if there are lots of incoming messages, and idles otherwise.

Your other option is to use the ZMQ event loop to respond to incoming messages asynchronously, using callbacks. See the PyZMQ documentation on this topic, from which the following "echo" example is adapted:

# set up the socket, and a stream wrapped around the socket
s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
stream = ZMQStream(s)

# Define a callback to handle incoming messages
def echo(msg):
    # in this case, just echo the message back again
    stream.send_multipart(msg)

# register the callback
stream.on_recv(echo)

# start the ioloop to start waiting for messages
ioloop.IOLoop.instance().start()
Community
  • 1
  • 1
DNA
  • 42,007
  • 12
  • 107
  • 146
  • thanks for your response, but can you elaborate "Your other option is to use the ZMQ event loop to respond to incoming messages asynchronously, using callbacks."? – bonbon.langes Feb 21 '14 at 16:13
  • No problem - I have added a link and an example. – DNA Feb 21 '14 at 16:23
  • hmmm... i'm actually building a flask+tornado app. and in order to pass messages from the app to the sockjs connection via zmq, i have to run the zmq script separately. already looking into the link that you shared. hopefully i could be able to run the zmq script on top of tornado. i'll definitely let you know how it goes. thanks for the help! – bonbon.langes Feb 21 '14 at 16:28
  • i am so grateful you showed up and answered my question. it solved my main problem! first is that my problem of how my app could listen to messages has been solved! secondly, the forwarder script i created can now be removed because the websocket and the app can now communicate directly with each other! thank you very much! – bonbon.langes Feb 21 '14 at 17:03