0

I've been playing around with a Pool object while using an instance method as the func argument. It's been a bit surprising with regards to instance state. It seems like the instance gets reset on every chunk. E.g.:

import multiprocessing as mp
import logging

class Worker(object):
    def __init__(self):
        self.consumed = set()

    def consume(self, i):
        if i not in self.consumed:
            logging.info(i)
            self.consumed.add(i)

if __name__ == '__main__':
    n = 1
    logging.basicConfig(level='INFO', format='%(process)d: %(message)s')
    worker = Worker()

    with mp.Pool(processes=2) as pool:
        pool.map(worker.consume, [1] * 100, chunksize=n)

If n is set to 1, then 1 gets logged every time. if n is set to 20, it's logged 5 times, etc. What is the reason for this, and is there any way around it? I also wanted to use the initializer pool argument with an instance method but hit similar issues.

slushi
  • 1,414
  • 13
  • 22
  • [Processes do not share state. Sharing state between processes is not trivial](https://docs.python.org/3.6/library/multiprocessing.html#sharing-state-between-processes). – juanpa.arrivillaga Jan 19 '18 at 01:42
  • Apart from the reason, what are you trying to achieve? – user1767754 Jan 19 '18 at 01:59
  • @juanpa.arrivillaga i am not trying to share state. i would just like the instance state to be consistent in the new process rather than resetting on every chunk. – slushi Jan 19 '18 at 02:46
  • @user1767754 basically i have a service where i want to take in a class as a "processor" that would get executed in each worker process. it would be nice if the class could maintain state in each process. ideally i could use the initializer argument to call a "start" instance method on the class instance as well. – slushi Jan 19 '18 at 02:48
  • Every chunk creates a new `Worker` in Memory and those are shared. So if you have chunksize `2` and `10` jobs, `5` objects will share the same worker. – user1767754 Jan 19 '18 at 02:57

1 Answers1

2

The instance method worker.consume is passed to the worker processes on a queue. To accomplish this, it must be pickled. For every job, the same pickle string is received, but a new instance is created when that string is unpickled. You can see the gist of what's going on here, without any multiprocessing:

In [1]: import pickle

In [2]: class Thing:
   ...:     def __init__(self):
   ...:         self.called = 0
   ...:     def whoami(self):
   ...:         self.called += 1
   ...:         print("{} called {} times".format(self, self.called))

In [3]: pickled = pickle.dumps(Thing().whoami)

In [4]: pickle.loads(pickled)()
<__main__.Thing object at 0x10a636898> called 1 times

In [5]: pickle.loads(pickled)()
<__main__.Thing object at 0x10a6c6550> called 1 times

In [6]: pickle.loads(pickled)()
<__main__.Thing object at 0x10a6bd940> called 1 times

The id of each Thing instance is different, and each has its own called attribute.

Nathan Vērzemnieks
  • 5,495
  • 1
  • 11
  • 23
  • right but the question is why does the pool unpickle a new instance per chunk and is there any workaround to "solve" this problem? i suppose maybe using globals and have the pickled func just access the global? I'd love to avoid globals though – slushi Jan 19 '18 at 09:15