2

I am trying to use ZeroMQ for multiprocessing. I want to stream files from a tar file so I used the streamer. Below is an instance of what want to do.

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process

def server(frontend_port, number_of_workers):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in range(0,10):
        socket.send_json('#%s' % i)
    for i in range(number_of_workers):
        socket.send_json('STOP')   
    return True

def worker(work_num, backend_port):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)

    while True:
        message = socket.recv_json()
        if message == 'STOP':
            break
        print("Worker #%s got message! %s" % (work_num, message))
        time.sleep(1)

def main():
    frontend_port = 7559
    backend_port = 7560
    number_of_workers = 2

    streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
    streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
    streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
    streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
    streamerdevice.start()
    processes = []
    for work_num in range(number_of_workers):
        w = Process(target=worker, args=(work_num,backend_port))
        processes.append(w)
        w.start()
    time.sleep(1)
    s = Process(target=server, args=(frontend_port,number_of_workers))
    s.start()
#     server(frontend_port)
    s.join()
    for w in processes:
        w.join()

if __name__ == '__main__':
    main()

This code works properly. But I want to use send_multipart() to send a tuple or a list that includes items with different types like [string, numpy_array, integer] but json can't handle numpy arrays. I am avoiding using pickle because I need it to be as fast as possible. I tried to convert the array to bytes too but it didn't work. (maybe I was doing it wrong I am not sure). I appreciate if you can provide a working snippet of code. Ideally, I want to do something like this:

socket.send_multipart([string, numpy_array, integer])

So I want to know what is the most efficient way of doing it.

I am using Python 3.6

Ehsan Fathi
  • 598
  • 5
  • 21
  • 1
    Possible duplicate of [How can I serialize a numpy array while preserving matrix dimensions?](https://stackoverflow.com/questions/30698004/how-can-i-serialize-a-numpy-array-while-preserving-matrix-dimensions) – a_guest Jan 06 '19 at 17:51
  • In addition to the above linked question there are many othet possibilities, such as `tostring` or `tobytes` and saving the shape separately. Also `tolist` is an option. – a_guest Jan 06 '19 at 17:54
  • @a_guest I have tried options in the second comment. `tobytes` was better. Thanks for providing the link. I'll give a try to `MsgPack` discussed [here](https://stackoverflow.com/a/48347205/6701576) – Ehsan Fathi Jan 06 '19 at 19:11

1 Answers1

0

msgpack and msgpack_numpy are the best option I could find. Try this:

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import numpy as np
import msgpack
import msgpack_numpy as m

def server(frontend_port, number_of_workers):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in range(0,10):
        arr = np.array([[[i,i],[i,i]],[[i,i],[i,i]]])
        file_name = 'image file name or any other srting'
        number = 10 # just an instance of an integer
        msg = msgpack.packb((arr, number, file_name), default=m.encode, use_bin_type=True)  
        socket.send(msg, copy=False)
        time.sleep(1)

    for i in range(number_of_workers):
        msg = msgpack.packb((b'STOP', b'STOP'), default=m.encode, use_bin_type=True)
        socket.send(msg, copy=False)   
    return True

def worker(work_num, backend_port):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)

    while True:
        task = socket.recv()
        task = msgpack.unpackb(task, object_hook= m.decode, use_list=False,  max_bin_len=50000000, raw=False)
        if task[1] == b'STOP':
            break
        (arr, number, file_name) = task
        print("Worker ",work_num,  'got message!', file_name)
    return True

def main():
    m.patch()
    frontend_port = 3559
    backend_port = 3560
    number_of_workers = 2

    streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
    streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
    streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
    streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
    streamerdevice.start()
    processes = []
    for work_num in range(number_of_workers):
        w = Process(target=worker, args=(work_num,backend_port))
        processes.append(w)
        w.start()
    time.sleep(1)
    s = Process(target=server, args=(frontend_port,number_of_workers))
    s.start()
    s.join()
    for w in processes:
        w.join()

if __name__ == '__main__':
    main()
Ehsan Fathi
  • 598
  • 5
  • 21