0

I have implemented a Python socket server. It sends image data from multiple cameras to a client. My request handler class looks like:

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        while True:
            data = self.request.recv(1024)
            if data.endswith('0000000050'): # client requests data

                for camera_id, camera_path in _video_devices.iteritems():
                    message = self.create_image_transfer_message(camera_id, camera_path)
                    self.request.sendto(message, self.client_address)

     def  create_image_transfer_message(self, camera_id, camera_path):
         # somecode ...

I am forced to stick to the socket server because of the client. It works however the problem is that it works sequentially, so there are large delays between the camera images being uploaded. I would like to create the transfer messages in parallel with a small delay between the calls.

I tried to use the pool class from multiprocessing:

import multiprocessing

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):

    ...

   pool = multiprocessing.Pool(processes=4)
   messages = [pool.apply(self.create_image_transfer_message, args=(camera_id, camera_path)) for camId, camPath in _video_devices.iteritems()]

But this throws:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I want to know if there is an another way to create those transfer messages in parallel with a defined delay between the calls?

EDIT:

I create the response messages using data from multiple cameras. The problem is, that if I run the image grabbing routines too close to each other I get image artifacts, because the USB bus is overloaded. I figured out, that calling the image grabbing sequentially with 0.2 sec delay will solve the problem. The cameras are not sending data the whole time the image grabbing function is running, so the delayed parallel cal result in good images with only a small delay between them.

karlitos
  • 1,604
  • 3
  • 27
  • 59
  • You've come up against Python's multiprocessing weakness. Python processes communicate (behind the scenes) by pickling objects and sending them back and forth across pipes. `pickle` has [limitations on what it can and can't process](https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled). In your case, you have a function or class somewhere that is not defined at the module level that is choking up the pickler. – Joel Cornett Oct 17 '14 at 13:52

2 Answers2

1

I think you're on the right path already, no need to throw away your work.

Here's an answer to how to use a class method with multiprocessing I found via Google after searching for "multiprocessing class method"

from multiprocessing import Pool
import time

pool = Pool(processes=2)

def unwrap_self_f(arg, **kwarg):
    return C.create_image_transfer_message(*arg, **kwarg)

class RequestHandler(SocketServer.BaseRequestHandler):

    @classmethod
    def create_image_transfer_message(cls, camera_id, camera_path):
        # your logic goes here

    def handle(self):
        while True:
            data = self.request.recv(1024)
            if not data.endswith('0000000050'): # client requests data
                continue

            pool.map(unwrap_self_f, 
                (
                    (camera_id, camera_path)
                    for camera_id, camera_path in _video_devices.iteritems()
                )
            )

Note, if you want to return values from the workers then you'll need to explore using a shared resource see this answer here - How can I recover the return value of a function passed to multiprocessing.Process?

mjallday
  • 9,796
  • 9
  • 51
  • 71
  • Thank you very much for the tip, I will gladly check it out. My only concern is, if this "workaround" is not against "good pracite". I wonder if there is another way, than putting the timer into the method. – karlitos Oct 16 '14 at 16:58
  • if you're not using the class just move the method off of the class. there's a quick workaround. if you're using the class then check out this existing answer - http://stackoverflow.com/a/1816969/6084 – mjallday Oct 16 '14 at 17:16
  • I am still uncertain about the timing. What I want to achieve is running a class method with different parameters in a specified interval. I tried your code and played with different setting (especially the process= parameter) but I was not able to achieve satisfying result. The method is immediately called process-times and then the code waits, till the processes finish. I would like to have something like: call a method, wait xx time , call a method without waiting for the previous result ... – karlitos Oct 16 '14 at 20:41
  • @karlitos take a look, I made the solution more specific to your use case. there's no need for the time.sleep etc. – mjallday Oct 16 '14 at 22:44
  • Hello marshal, I am still unable to get your code running. I changed return C.create_image_transfer_ ... to return RequestHandler.create_image_transfer_ ... in the return statement of unwrap_self_f but I got this error: TypeError: unbound method create_image_transfer_message() must be called with LivePictureRequestHandler instance as first argument (got str instance instead). Editing the ppol.map call to (self, camera_id, camera_path) brought another TypeError: expected string or Unicode object, NoneType found. I also edited my first post to explain exactly my issue with the timing. – karlitos Oct 17 '14 at 07:30
  • @karlitos updated the method to be a classmethod. the other option would be to instantiate an instance of the class and use that. – mjallday Oct 17 '14 at 21:29
0

This code did the trick for me:

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        while True:
            data = self.request.recv(1024)
            if data.endswith('0000000050'): # client requests data

                process_manager = multiprocessing.Manager()
                messaging_queue = process_manager.Queue()
                jobs = []

                for camId, camPath in _video_devices.iteritems():
                    p = multiprocessing.Process(target=self.create_image_transfer_message,
                                                args=(camera_id, camera_path, messaging_queue))
                    jobs.append(p)
                    p.start()
                    time.sleep(0.3)

                # wait for all processes to finish
                for p in jobs:
                    p.join()

                while not messaging_queue.empty():
                    self.request.sendto(messaging_queue.get(), self.client_address)
karlitos
  • 1,604
  • 3
  • 27
  • 59