0

How can I get returned values from a method in another instance of multiprocessing.Process?

I have two files:

file hwmgr.py:

import multiprocessing as mp
from setproctitle import setproctitle
import smbus
import myLoggingModule as log

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self, cmd_q, res_q)
        self.i2c_lock = mp.Lock()
        self.commandQueue = cmd_q
        self.responseQueue = res_q
    def run(self):
        setproctitle('hwmgr')
        while True:
            cmd, args = self.commandQueue.get()
            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            if result is not None:
                self.responseQueue.put(result)

    def get_voltage(self):
        with self.i2c_lock:
            # ...do i2c stuff to get a voltage with smbus module
        return voltage

file main.py:

import multiprocessing as mp
import hwmgr

cmd_q = mp.Queue()
res_q = mp.Queue()

hwm = hwmgr.HWManager(cmd_q, res_q)
hwm.start()

cmd_q.put(('get_voltage', {}))
battery = res_q.get()

print battery

While this solution works, the complexity of the HWManager process is likely to grow in the future, and other processes are spawned off main.py (code is simplified) which use the same mechanism. There's obviously a chance that the wrong process will get the wrong return data from it's res_q.get() command.

What would be a more robust way of doing this?
(I'm trying to avoid having one return mp.Queue for each other process - as this would require reworking the HWManager class each time to accommodate the additional Queues)

OK - WIP code is as follows:

hwmgr.py:

import multiprocessing as mp
from multiprocessing.connection import Listener
from setproctitle import setproctitle
import smbus

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self)
        self.i2c_lock = mp.Lock()

    def run(self):
        setproctitle('hwmgr')
        self.listener = Listener('/tmp/hw_sock', 'AF_UNIX')

        with self.i2c_lock:
            pass  # Set up I2C bus to take ADC readings

        while True:
            conn = self.listener.accept()
            cmd, args = conn.recv()

            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            conn.send(result)

    def get_voltage(self):
        with self.i2c_lock:
            voltage = 12.0  # Actually, do i2c stuff to get a voltage with smbus module

        return voltage

file client.py

import multiprocessing as mp
from multiprocessing.connection import Client
from setproctitle import setproctitle
from time import sleep

class HWClient(mp.Process):

def __init__(self):
    mp.Process.__init__(self)
    self.client = Client('/tmp/hw_sock', 'AF_UNIX')

def run(self):
    setproctitle('client')
    while True:
        self.client.send(('get_voltage', {}))
        battery = self.client.recv()
        print battery
        sleep(5)

main.py:

import hwmgr
import client

cl = client.HWClient()  # Put these lines here = one error (conn refused)
cl.start()
hwm = hwmgr.HWManager()
hwm.start()
# cl = client.HWClient()  # ...Or here, gives the other (in use)
# cl.start()
BugSpray
  • 113
  • 9
  • Your situation is a little confusing. If you need to processes to talk to each other exclusive of other processes then the _Queue_ or _Pipe_ between each process is ok, but it seems like you want to broadcast. In which case I would write a _PubSub_ type of system that you can broadcast messages to, in this case other processes would receive all messages but could be set to only respond to messages they care about. In this situation each child process would _subscribe_ to the _publishers_ that they care about. This could be down via a _Pipe_ or a shared object from a _Manager_ object. – sean Apr 01 '14 at 02:50
  • The real application of this is a hardware manager process to control access to hardware resources. The mechanism above shows only one other process (in this case, the 'main' one) wanting to get I2C data. Another spawned process might request to toggle a relay, for example, and the returned value would be a T/F for succeed/fail. The common process is the hardware manager - which can receive commands on a single mp.Queue, but the return values go to different places. I don't want to rewrite HWManager each time there's a new process requesting a service, just to provide a new return Queue. – BugSpray Apr 01 '14 at 03:02

1 Answers1

1

This sounds like it calls for a standard client-server architecture. You can use UNIX domain sockets (or named pipes on Windows). The multiprocessing module makes it super easy to pass python objects between processes. Sample structure of server code:

from multiprocessing.connection import Listener

listener = Listener('somefile', 'AF_UNIX')

queue = Queue()
def worker():
    while True:
        conn, cmd = queue.get()
        result = execute_cmd(cmd)
        conn.send(result)
        queue.task_done()


for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()


while True:
    conn = listener.accept()
    cmd = conn.recv()
    queue.put((conn, cmd)) # Do processing of the queue in another thread/process and write result to conn

The client side would look like:

from multiprocessing.connection import Client
client = Client('somefile', 'AF_UNIX')

client.send(cmd)
result = client.recv()

The above code uses threads for workers, but you could just as easily have processes for workers using the multiprocessing module. See the docs for details.

spinlok
  • 3,561
  • 18
  • 27
  • So, I would replace `queue.append((conn, cmd))` with the code to run a method in the "server"? Doesn't this still have the possibility that if two processes are waiting on a `recv()`, they could confuse each other's return data? – BugSpray Apr 01 '14 at 04:31
  • No, a connection object is unique to every client process, just like sockets. The programming model is intentionally very similar to the sockets API. You would write the result of the cmd to conn, so the right client process gets the result. – spinlok Apr 01 '14 at 05:36
  • @BugSpray I have added some more detail about how this could work. – spinlok Apr 01 '14 at 05:43
  • Ah, OK. So, just to confirm, I can use the same `Client('somefile', 'AF_UNIX')` in all client applications (no need to use a different `address` argument for each client? (sorry, the API docs are not clear) – BugSpray Apr 01 '14 at 05:44
  • Yes, you **have** to use the same domain socket address (just a name) in all client processes. Under the covers, it is just a named pipe. See http://stackoverflow.com/questions/9644251/how-do-unix-domain-sockets-differentiate-between-multiple-clients for more details on this. – spinlok Apr 01 '14 at 06:01
  • I can see how this works for a true socket. When using a file descriptor, however, one of two things happens - either I get "socket.error Connection refused" or "Address already in use". This code is destined for a linux system, so I already found I have to create a valid file for the address parameter. – BugSpray Apr 01 '14 at 07:05
  • The difference between these two errors is whether I create the `Client` before instantiating my hardware manager (which in turn makes its own `Listener`), or setting up the `Client` just after calling `hwmgr.start()`. Errors, either way! :-( – BugSpray Apr 01 '14 at 07:06
  • Which linux are you using? I tried it out on my Mac and it runs, no problem at all. Could you post your code? I think you may be trying to bind to the address multiple times – spinlok Apr 01 '14 at 07:20
  • OK, added WIP code, pls see if I've misunderstood something. So, even if I had another client process - possibly spawned off main.py - which used the same mechanics and file descriptor, there's no chance of a clash with the return values? – BugSpray Apr 01 '14 at 23:18
  • I've got this mechanism to work with a port on '127.0.0.1', but can't get it working from a file object as the socket. Can't find any info on this application; will accept IP/port as next best thing. – BugSpray Apr 03 '14 at 03:06