2

I am practicing on using shared values for multiprocessing. I have an existing Process function that is working using shared value:

def run_procs_with_loop(lock):

    # this is my shared value 
    shared_number = Value('i', 0)

    print(__name__, 'shared_value in the beginning', shared_number.value)

    # create a process list to append each process spawned by the for- loop
    processes = []
    for _ in range(2):
        p = Process(target=add_100_locking, args=(shared_number, lock))
        processes.append(p)
        p.start()

    for _ in processes:
        p.join()

    print('shared_value at the end', shared_number.value)


The above process is directed to spawn TWO processes, and each process is directed to a function with args (shared_number, lock). The function ran as expected.

I tried to convert it to a multiprocessing Pool - I attempted to pass the argument `[ shared_number, lock] *2 in my pool.map() statement (I want the Pool to spawn just two processes) but python is rejecting it:

def run_procs_with_pool(lock):

    shared_number = Value('i', 0)
    print(__name__, 'shared_value in the beginning', shared_number.value)

    # create processes using multiprocessing.Pool
    pool = Pool()
    pool.map(add_100_with_lock, [(shared_number,lock)] * 2)

    print('shared_value at the end', shared_number.value)


Thanks for any helpful input in advance.


Update:

Someone suggested that I use starmap instead of map, but I am getting the error RuntimeError: Synchronized objects should only be shared between processes through inheritance. it looks like multiprocessing.Pool does not allow shared values to be passed in this way?

Thought I'd share the task function add_100_with_lock as shown below:

def add_100_with_lock(num,locking):
    for _ in range(100):
        time.sleep(0.001)
        with lock:
            num.value += 1    

Is there way to make passing shared values work with multiprocessing.Pool to work?

punsoca
  • 459
  • 1
  • 7
  • 15

2 Answers2

0

When you write

pool.map(add_100_with_lock, [(shared_number,lock)] * 2)

the iterable you are passing as a parameter is a list of tuples, so add_100_with_lock will not get two parameters, but a single tuple, as if you called add_100_with_lock((shared_number,lock,)) instead of add_100_with_lock(shared_number,lock). Pool.map is implemented for functions having only one parameter.

You can change the definition of add_100_with_lock, although I do not recommend this solution. You can also wrap it into another function that receives a tuple and calls it, i.e.:

def wrap_add_100(args):
    return add_100_with_lock(*args)
...
pool.map(wrap_add_100, [(shared_number,lock)] * 2)

or use Pool.starmap, that expects a list of iterables and takes one of each for using as parameters:

pool.starmap(add_100_with_lock, [[shared_number]*2, [lock]*2])

This last option is what I recommend, since it preserves the function signature.

azelcer
  • 1,383
  • 1
  • 3
  • 7
  • Thank you azelcer. However, when I tried starmap I got this error: "RuntimeError: Synchronized objects should only be shared between processes through inheritance" – punsoca Feb 17 '22 at 06:55
  • It looks like process pool does not like passing shared data, when the multiprocess.Process allows it with ease. – punsoca Feb 17 '22 at 16:49
0

I finally was able to resolve the restriction issue in multiprocessing Pool with regards to shared variables by using Manager() object - per python documentation: Managers provide a way to create data "which can be shared between different processes", including sharing over a network between processes running on different machines..

This is how I did it:

    # use the manager class to share objects between processes
    manager = Manager()
    shared_number = manager.Value('i', 0)

And since I would be passing only the shared_number (lock object is being passed at Pool creation time using the initializer= kwarg ( you can read all about it in the multiprocessing lock() discussion here) , I can go back to using pool.map() instead of pool.starmap().

Here is the COMPLETE working module:

from  multiprocessing import Lock, Pool, Manager
import time

# init function passed to Pool initializer to share multiprocessing.Lock() object to worker processes
def init_lock(l, ):
    global lock
    lock = l

def add_100_with_lock(num):

    # Since our pool process spawns TWO worker processes, and both processes share the 'num' variable, 
    # this 'num' value will be 200 after our two processes are done executing (100 * 2 parallel processes = 200).
    # I applied multiprocess locking here to avoid race conditions between worker processes
    for _ in range(100):
        time.sleep(0.001)
        with lock:
            num.value += 1

# Pool method 
def run_procs_lock_with_pool():
    
    # use the manager class to share objects between processes
    manager = Manager()
    shared_number = manager.Value('i', 0)

    print(__name__, 'shared_value in the beginning', shared_number.value)

    # like shared values, locks cannot be shared in a Pool - instead, pass the 
    # multiprocessing.Lock() at Pool creation time, using the initializer=init_lock.
    # This will make your lock instance global in all the child workers.
    # The init_lock is defined as a function - see init_lock() at the top.
    pool = Pool(initializer=init_lock, initargs=(l,))
    # specified two worker processes in the pool with the arg "[shared_number]*2"
    pool.map(add_100_with_lock, [shared_number]*2)


    print('shared_value at the end', shared_number.value)


if __name__ == '__main__':

    run_procs_lock_with_pool()
punsoca
  • 459
  • 1
  • 7
  • 15