14

I want to have global object which is shared and updated by all processes with minimum locking.

import multiprocessing

class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.value.update(1)
  print counter_proxy.value.value, 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy.value.value

def main():
  manager = multiprocessing.Manager()
  counter = manager.Value(Counter, Counter())
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value.value

if __name__ == '__main__':
  main()

The result is this - not 10 but zero. It looks like the object's shared value is not updated. How can I lock and update such value?

0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5
0 t3 PoolWorker-8
0 t4 PoolWorker-9
0 t5 PoolWorker-2
0 t6 PoolWorker-7
0 t7 PoolWorker-4
0 t8 PoolWorker-6
0 t9 PoolWorker-3
Should be 10 but is 0.

Current the best solution by @dano - I mixed custom manager with class proxy.

import multiprocessing
from multiprocessing.managers import BaseManager, NamespaceProxy


class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.update(1)

class CounterManager(BaseManager):
  pass

class CounterProxy(NamespaceProxy):
  _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'update')

  def update(self, value):
    callmethod = object.__getattribute__(self, '_callmethod')
    return callmethod(self.update.__name__, (value,))

CounterManager.register('Counter', Counter, CounterProxy)

def main():
  manager = CounterManager()
  manager.start()

  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value

if __name__ == '__main__':
  main()
Seanny123
  • 8,776
  • 13
  • 68
  • 124
Chameleon
  • 9,722
  • 16
  • 65
  • 127

1 Answers1

19

multiprocessing.Value isn't designed to be used with custom classes, it's supposed to be similar to a multiprocessing.sharedctypes.Value. Instead, you need to create a custom manager and register your class with it. Your life will also be easier if you don't access value directly, but modify/access it via methods, which will get exported by the default Proxy created for your class by default. Regular attributes (like Counter.value) aren't, so they aren't accessible without additional customization. Here's a working example:

import multiprocessing
from multiprocessing.managers import BaseManager

class MyManager(BaseManager): pass

def Manager():
    m = MyManager()
    m.start()
    return m 

class Counter(object):
  def __init__(self):
    self._value = 0

  def update(self, value):
    self._value += value

  def get_value(self):
      return self._value

MyManager.register('Counter', Counter)

def update(counter_proxy, thread_id):
  counter_proxy.update(1)
  print counter_proxy.get_value(), 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy

def main():
  manager = Manager()
  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func=update, args=(counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.get_value()

if __name__ == '__main__':
  main()

Output:

1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.
dano
  • 91,354
  • 19
  • 222
  • 219
  • Thanks for suggestion I do not know that I can register classes - looks nice. I create some alternate code like this but has not idea why it is working `counter = counter_proxy.get(); counter.update(1); counter_proxy.set(counter)` - just by typing. That is not problem if you read class attributes it can be evolved into properties it is simple ans should be used - I am not following java patterns that all must be armored at start - prefer be simple at first - it allow large code than Java styles. – Chameleon Feb 19 '15 at 17:37
  • 1
    @Chameleon The `@property` decorator won't work with the default `Proxy` type. If you really want to get access to the `value` attribute directly, see [this question](http://stackoverflow.com/questions/26499548/accessing-an-attribute-of-a-multiprocessing-proxy-of-a-class) for a way to do so. – dano Feb 19 '15 at 17:46
  • It looks complex. I need study examples. It looks that true threading is weakness of Python - not very good documentation with few examples also is problem. Good point that I should use methods. – Chameleon Feb 19 '15 at 19:14
  • Not sure but I think that `Manager` should be derived from `SyncManager` whatever this code will work and I not expert in this domain need to learn it. – Chameleon Feb 19 '15 at 19:25
  • 2
    @Chameleon You only need to have `MyManager` derive from `SyncManager` if you want to make use of the types that are pre-registered with `SyncManager` (`dict`, `list`, `Queue`, etc.). Otherwise, `BaseManager` works fine. – dano Feb 19 '15 at 19:28
  • @dano I want to share numpy random state of a parent process with a child process. I've tried using `Manager` but still no luck. Could you please take a look at my question [here](https://stackoverflow.com/questions/49372619/how-to-share-numpy-random-state-of-a-parent-process-with-child-processes) an see if you can offer a solution? I can still get different random numbers if I do `np.random.seed(None)` every time that I generate a random number, but this does not allow me to use the random state of the parent process, which is not what I want. – Amir Mar 20 '18 at 02:06
  • @dano why is this code extremely low? If I delete the print and range up to 1.000.000 it takes ages. Should managers be avoided when sharing classes? – aprospero Jun 06 '20 at 08:28
  • @aprospero The `Manager` works by creating a separate process, where it manages all the objects you register with it. So every call to `counter_proxy.update` involves IPC to the Manager process. In this case, you're sending `func`/`counter`/`i` to a `Pool` worker process 1,000,000 times (which is IPC that adds overhead), and then making 1,000,000 more IPC calls to the Manager process to update the counter. IPC is expensive, you definitely want it to be minimal compared to the rest of the work your code is doing. – dano Jun 06 '20 at 15:08