0

I created two programs to send and receive video feed using ZeroMQ. However, the receiving program always gets stuck on the .recv()-method.

I have used two libraries of ZeroMQ for this program: one, the native zmq, the other a derived imagezmq. The imagezmq is used for sending and receiving frame data from the video while the native zmq library is used for sending and receiving the time, when the image has been sent.

The imagezmq part works fine.

The program only gets stuck on the zmq part.

Following are my two programs :

FinalCam.py

import struct
import time
import imutils
import imagezmq
import cv2
import zmq    
import socket
import pickle

#                                         # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')

hostName = socket.gethostname()           # send RPi hostname with each image

vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir)           # init the camera

context = zmq.Context()                   # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")

while True:                               # send images as stream until Ctrl-C

    ret, frame = cap.read()
    frame = imutils.resize(frame, width=400)  # resize without compressionq

    captureTime = time.time()
    sender.send_image(hostName, frame)
    print (captureTime)
    captureTime = struct.pack('d', captureTime)
    #msg = pickle.dumps(captureTime)
    print("message primed")
    socket.send(captureTime)
    print("time sent")

which generated this output :

1591824603.5772414
message primed
time sent

FinalRecieve.py

import cv2
import imagezmq
import time
import zmq
import struct

FRAMES = 5
image_hub = imagezmq.ImageHub()           # image socket

context = zmq.Context()                   #  time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")

while True:                               # show streamed images until Ctrl-C

    loopTime = time.time()

    for i in range (0, FRAMES):
        hostName, frame = image_hub.recv_image()
        image_hub.send_reply(b'OK')
        print("recieved image, waiting for time")
        captureTime = socket.recv()
        print("meow")
        print(captureTime)

    finishTime = time.time()
    fpsTime = finishTime - loopTime

    fps = FRAMES / fpsTime
    print(fps)

which generated this output :

received image, waiting for time
user3666197
  • 1
  • 6
  • 50
  • 92
Che Su
  • 5
  • 3

2 Answers2

2

Here’s a couple of things to try to get the native-zmq parts working:

Use .connect()-method for SUB-sockets : socket.connect("tcp://localhost:6666")

And .bind()-method for your PUB-sockets : socket.bind("tcp://*:6666")

It’s explained here in the guide that connect should be used to create an outgoing connection from a socket.

In the sibling doc for .bind(), it explains that it’s for accepting connections.

Also try setting socket options : socket.setsockopt(zmq.SUBSCRIBE, "")

It is described here in the guide that the SUB-sockets initially filter out all messages, so that’d explain why you’re not receiving anything. The example above provides an empty filter, which accepts all incoming messages.

It’s important to note that with PUB and SUB based distribution, that the SUB might not necessarily receive messages due to timing of its connection or network conditions. E.g. everything sent from the publisher before the subscriber connects isn’t receivable

JJ Hassan
  • 395
  • 3
  • 11
  • Hey there, thanks for the feedback! I tried both options with no luck. The socket.connect method resulted in an invalid argument error while the setsockopt method yielded the same result as my initial problem. Are there any other methods that might work? – Che Su Jun 11 '20 at 02:03
  • With all respect, this answer is misleading. The pure ZeroMQ part referred is by far not an API v3.2 compliant and you missed many traps hidden yet visible inside the ImageZMQ classes. Sorry, but a misdirected advice does not help @CheSu nor other StackOverflow Community Members. – user3666197 Jun 11 '20 at 02:55
  • With all due respect, you continue to ignore the facts - O/P needs to keep Camera-holding RPi-s ( perhaps due to mobility and easy re-configuration ) to get DHCP-addresses, not a static one-s, so will need to keep .connect()-towards the known, static IP-address of the ImageHub. Advising to reverse the scenario is possible, yet at devastating costs if meant to be implemented & maintained after being deployed - you advise to re-discover all ( perhaps dynamically ) changing IP-addresses of all RPi-cameras within a LoS, and somehow re-enforce the only static ImageHub-side keep re-.connect()-ing... – user3666197 Jun 11 '20 at 03:48
  • @chesu I did miss a couple of things. I think your particular error is simply to do with the SUB socket not having its options set. There are potentially timing issues at play too. Because of the way PUB-SUB works, anything PUB sends before your SUB connects isn’t visible to your SUB. Even after connecting, they could be lost. – JJ Hassan Jun 11 '20 at 03:59
  • @user3666197 the question was only about why the ZMQ parts of the program aren’t working. To know how to use ZMQ in a robust and production ready way, I would advise reading more of the guide. To help, I’m trying to give an answer that’s to the point but at least points the OP in the direction of more in-depth learning. – JJ Hassan Jun 11 '20 at 04:05
  • @JJHassan - this was obvious from the very beginning - these problems arise once users start using a blocking-mode of .recv() in all cases of uncertain presence / delivery of (just) assumed-order-of-operations. This is a common blindness for serial-code designers, that step into the domain of **distributed-computing**, where we loose all warrants of presence and all warrants of order-of-appearance **:o)** A non-blocking mode will never lead to this ( be it zmq_poll()-based or a trivial for-loop with try: / except : / finally : -crafted handlers ), but requires more work & thorough know-how ... – user3666197 Jun 11 '20 at 04:05
0

The Problem Definition :

"The imagezmq part works fine. The program only gets stuck on the zmq part. "


Solution ?

ZeroMQ is so smart ( since v2.0+ ) that is does not require anyone to be rigid on whether to { .bind() | .connect() } at all and one AccessPoint may freely .bind() some and also .connect() other TransportClass-channels for 1:N communiation topologies as liked & needed ( in other words, the "reversed" .bind()/.connect() is always possible ).

Not so the ImageZMQ-spin-off module. There are set of hard-wired choices made behind your domain of control (and any hardcore hijacking the Class-internal attributes { ImageHub.zmq_socket | ImageSender.zmq_socket } was hardly a dream of the ImageZMQ authors ( architecture-(co)-authors ) ).

Given the imagezmq published internalities are themselves pre-decided on the hardcoded choices ( the same Class instances, depending on mode, sometimes .bind() and .connect() otherwise ) was declared as working, lets focus on using it to its (published) maximum.

Based on the said, compose your working parts so as to send the time "inside" the working scheme :

PAYLOAD_MASK = "{0:}|||{1:}"
...
while ...
      sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
                         frame
                         )

and may easily decode accordingly on .recv()-side, without a need to repair the used struct.pack() problems ( In , the more in ZeroMQ interconnected worlds, one does never know, what the remote platform is, the less what byte-ordering ( Endian-convention ) will be assumed "there", so struct.pack() shall always explicitly declare Endian-type in the format-string. Always. If not sure, one may design a self-detecting message, that may salvage such naive uses and test / adjust your format-string for your local-side usage for .unpack()-method accordingly for either case of any implicitly blind .pack()-sender ... goes beyond the scope of this post, yet all RPi / non-RPi platform projects are prone to this (only)-assumed-"same"-Endian caveat )


Surprise :

If using pyzmq version 18.., which is not listed in the compatibility list of the ImageZMQ and the use-case is there doomed to crash or cause hidden troubles.

So one ought rather start with zmq.pyzmq_version() checks (a professional policy in Configuration Management for controlled environments, isn't it?) and catch & resolve any non-matching cases.


Why the zmq part did not work ?

Hard to say. Without a full code, one may just guess. My candidate would be a wrong / missing termination with zmq.LINGER not explicitly set to zero, which causes hung-up Context()-instance(s) that block the actual resources until reboot. A proper use of the ZeroMQ tools reflects these methods of defensive-programming ( LINGER, CONFLATE, MAXMSGSIZE, IMMEDIATE, white-listing and many other defensive .setsockopt() and Context()-parametrisation options ), because is complex and may broke on many places outside of your domain of control or Line-of-Sight.

So, do not hesitate to become a defensively-programming designer, if your architected systems are to become robust and self-healing.

aLocalCONTEXT = zmq.Context( nIOthreads )
aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
try: 
      aSUBsocket.bind( "tcp://*:6666" )
except:
      ...
aSUBsocket.setsockopt( zmq.LINGER,      0 )
aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
...

Best re-read the native ZeroMQ API documentation about tuning other ISO-OSI-L2/L3 related paramters for best performance and safest strategies. Worth the time doing this indeed for each new API update, all the way since the v2.1+...


A Counterexample of This ?

Look at how .send_reply() method is unprotected from crashing for non-REQ-REP instances of its own ImageHub Class.

Why?

The so far non-protected call to the .send_reply()-method will crash the application once any such call for any ImageHub-class instance, that was initialised in it non-default REQ_REP = False ( i.e. on a SUB-side of the PUB/SUB mode available ).

aNonDefaultImageHUB_instance.send( "this will crash" )
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported
halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
  • @halfer - removing explanatory text is possible, yet counter-intuitive for a site, that tries to explain how things work. I started to append this section to answers namely for people, who feel they understand (native, O/S ) sockets and get surprised the same words suddenly mean something different in the context of ZeroMQ Socket Archetypes. Having answered 697+ ZeroMQ & pymzq questions, I feel I can detect when such missing pieces of conceptually important ideation are needed to be put onto the table, so while you may keep deleting 'em, you just spoil the learning curves of our valued members – user3666197 Oct 24 '20 at 18:37
  • Thanks for engaging, and I understand your view. However, I don't think it takes into account that the Q&A here is deliberately encouraged to be succinct and technical, and that duplicated (copy+paste) information spread over posts is something editors try to remove from anyone's material (so you can be reassured that it is not a rule made up just for you). – halfer Oct 24 '20 at 18:44
  • I think this copy+paste material is too vague, too - it contains a link to all your answers in a particular tag. Now, of course you are within your rights to regard your material as the best example of things to read for learners, but "read my answer history" isn't particularly focussed educational advice. I recall other editors have also removed this in particular from your material (including a moderator) so I would still discourage it. Could you perhaps ask a Meta question about it, before continuing to add this boilerplate? – halfer Oct 24 '20 at 18:47
  • (I've removed the spoiler and a copy+paste of our comments, since that would obviously be removed by a moderator - hopefully you can take that steer as helpful!) – halfer Oct 24 '20 at 18:48