7

Perhaps someone more fluent in Python's Multiprocessing Pool code could help me out. I am trying to connect to several hosts on my network simultaneously (N at any one time) over a socket connection and execute some RPC's. As one host finishes, I want to add the next host into the Pool to run until all are complete.

I have a class, HClass, with some methods to do so, and a list of hostnames contained in hostlist. But I am failing to grok any of the docs.python.org examples for Pool to get this working.

A short snippet of code to illustrate what I've got so far:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())

So far I am blessed with this traceback:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed

Where the process just hangs until I Control-C out of it.

jfofo
  • 103
  • 2
  • 4

3 Answers3

7

I would try to keep interprocess communication down to a minimum. It looks like all you really need to send is the hostname string:

for host in hostlist:
    pool.apply_async(worker, args = (host,), callback = on_return)

For example,

import multiprocessing as mp
import time
import logging

logger = mp.log_to_stderr(logging.INFO)

hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2

class HClass:
    def __init__(self, hostname="default"):
        self.hostname = hostname

    def go(self):
        logger.info('processing {h}'.format(h = self.hostname))
        time.sleep(1)
        return self.hostname

def worker(host):
    h = HClass(hostname = host)
    return h.go()

result = []
def on_return(retval):
    result.append(retval)

if __name__ == "__main__":
    pool = mp.Pool(poolsize)
    for host in hostlist:
        pool.apply_async(worker, args = (host,), callback = on_return)
    pool.close()
    pool.join()
    logger.info(result)
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • unutbu, you are exactly right. I think I was trying to skip steps/keep things simpler (at least in my head) by not making that go() function (your worker) outside the class. Thanks! – jfofo Jan 02 '13 at 12:40
1

I think this is the same question with Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

Copied from the answers in the above link. The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable.

One approach is making go function unbounded, like puting it out of class. Or make that function packlable with copy_reg

Community
  • 1
  • 1
jinghli
  • 617
  • 4
  • 11
1

I agree with @unutbu's solution... simpler is better. However if you did have to send the class method go, I'd use pathos.multiprocesssing, instead of multiprocessing.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Get the code here: https://github.com/uqfoundation/pathos

Mike McKerns
  • 33,715
  • 8
  • 119
  • 139