1

I have a function that does a calculation and saves the state of the calculation in the result dictionary (default default argument). I first run it, then run several processes using the multiprocessing module. I need to run the function again in each of those parallel processes, but after this function has run once, I need the cached state to be returned, the value must not be recalculated. This requirement doesn't make sense in my example, but I can't think of a simple realistic argument that would require this restriction. Using a dict as mutable default argument works, but this doesn't work with the multiprocessing module. What approach can I use to get the same effect?

Note that the state value is something (a dictionary containing class values) that cannot be passed to the multiple processes as an argument afaik.

The SO question Python multiprocessing: How do I share a dict among multiple processes? seems to cover similar ground. Perhaps I can use a Manager to do what I need, but it is not obvious how. Alternatively, one could perhaps save the value to a global object, per https://stackoverflow.com/a/4534956/350713, but that doesn't seem very elegant.

def foo(result={}):
    if result:
        print "returning cached result"
        return result
    result[1] = 2
    return result

def parafn():
    from multiprocessing import Pool
    pool = Pool(processes=2)
    arglist = []
    foo()
    for i in range(4):
        arglist.append({})
    results = []
    r = pool.map_async(foo, arglist, callback=results.append)
    r.get()
    r.wait()
    pool.close()
    pool.join()
    return results

print parafn()

UPDATE: Thanks for the comments. I've got a working example now, posted below.

Community
  • 1
  • 1
Faheem Mitha
  • 6,096
  • 7
  • 48
  • 83
  • You might be able to use `mmap` to share data between processes using a variation of this [answer](http://stackoverflow.com/a/11653499/355230). – martineau Sep 30 '12 at 21:24

2 Answers2

1

I think the safest way of exchange data between procesess is with a Queue, the multiprocessing module brings you 2 types of them Queue and JoinableQueue, see documentation:

http://docs.python.org/library/multiprocessing.html#exchanging-objects-between-processes

Netwave
  • 40,134
  • 6
  • 50
  • 93
  • It seems these queue objects have to be passed around as arguments. This won't work if the arguments are class objects. – Faheem Mitha Sep 30 '12 at 17:10
  • It will work fine if you pass the queue to the pool initializer. See Olson's answer here: http://stackoverflow.com/questions/3827065/can-i-use-a-multiprocessing-queue-in-a-function-called-by-pool-imap. Or just use Process, which accepts queues as args. – MikeHunter Sep 30 '12 at 17:41
  • @MikeHunter: I see. I was not aware one could pass arguments to the pool. If the object does not need to be pickled, then it may work. I'll try it. Thanks. – Faheem Mitha Sep 30 '12 at 19:19
  • you can use lists or dicts also by using a multiprocessing.Manager to instantiate them – andrean Sep 30 '12 at 19:19
  • @andrean: can one pass a dict argument to the pool initializer? – Faheem Mitha Sep 30 '12 at 19:27
  • @Faheem, try this: m=multiprocessing.Manager(); q, d, lst = m.Queue(), m.dict(), m.list(). All 3 are thread-safe when used with a pool and can be passed as args in pool.map_async or apply_async. Read the official docs for special handling required for the list and dict proxies. I still agree with Daniel that a queue is the safest, but you do have list and dict proxies as options. – MikeHunter Sep 30 '12 at 21:17
1

This code would not win any beauty prizes, but works for me. This example is similar to the example in the question, but with some minor changes. The add_to_d construct is a bit awkward, but I don't see a better way to do this.

Brief summary: I copy the state of foo's d, (which is a mutable default argument) back to foo, but the foo in the new process spaces created by the pool. Once this is done, then foo in the new process spaces will not recalculate the cached values. It seems this is what the pool initializer does, though the documentation is not very explicit.

class bar(object):
    def __init__(self, x):
        self.x = x
    def __repr__(self):
        return "<bar "+ str(self.x) +">"

def foo(x=None, add_to_d=None, d = {}):
    if add_to_d:
        d.update(add_to_d)
    if x is None:
        return
    if x in d:
        print "returning cached result, d is %s, x is %s"%(d, x)
        return d[x]
    d[x] = bar(x)
    return d[x]

def finit(cacheval):
    foo(x=None, add_to_d=cacheval)

def parafn():
    from multiprocessing import Pool
    arglist = []
    foo(1)
    pool = Pool(processes=2, initializer=finit, initargs=[foo.func_defaults[2]])
    arglist = range(4)
    results = []
    r = pool.map_async(foo, iterable=arglist, callback=results.append)
    r.get()
    r.wait()
    pool.close()
    pool.join()
    return results

print parafn()
Faheem Mitha
  • 6,096
  • 7
  • 48
  • 83