1

I have a function that parallizes another function via multiprocessing pool which takes a dictionary as input. I would expect that the code below just prints the number from 0 to 32. However, the result shows that there are many numbers being printed more than once.

Anybody an idea?

import multiprocessing as mp
import numpy as np
import functools

def test(name, t_dict):
    t_dict['a'] = name
    return t_dict

def mp_func(func, iterator ,**kwargs):
    f_args = functools.partial(func, **kwargs)
    pool = mp.Pool(mp.cpu_count())
    res = pool.map(f_args, iterator)
    pool.close()
    return res


mod =dict()

m =33
res = mp_func(func=test, iterator=np.arange(m), t_dict=mod)
for di in res:
    print(di['a'])

  • This is multithreading and multiprocessing. You cannot expect things to happen sequentially. If multiple threads are all modifying the same element of a dictionary, then there is no reason to expect that two updates can't happen before either of them returns. What are you trying to do?? – Frank Yellin Nov 22 '20 at 02:22
  • . . . .Note that if you add the line "t_dict = t_dict.copy()" to your code, then it is clear to everyone that each runner has a completely different dictionary, and they never have a chance to interfere with each other. Everything works fine. But again, what is the big picture here? – Frank Yellin Nov 22 '20 at 02:25
  • Multiprocessing will share the memory. You need to lock the process. Take a look [here](https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes) – Paulo Marques Nov 22 '20 at 02:26
  • This is really interesting. I added `print('setting', name, t_dict)` to the top of `test` and got `setting 0 {}`, `setting 1 {'a': 0}`, `setting 2 {}`, `setting 3 {'a': 2}` so sometimes the worker is overwriting an already-set value. This has something to do with chunking and by doing `res = pool.map(f_args, iterator, chunksize=1)` the problem goes away. But _why_ chunking does this ... i can't figure out. – tdelaney Nov 22 '20 at 03:02
  • @FrankYellin the big picture is difficult to explain since this is a reduced minimal example. making a copy of the dict in the test function seems to be working. Yet, I still have the feeling that there should be a more general solution to this problem. –  Nov 22 '20 at 09:16

2 Answers2

0

The problem is that t_dict is passed as part of the partial function f_args. Partial functions are instances of <class 'functools.partial'>. When you create the partial, it gets a reference to test and the empty dictionary in mod. Every time you call f_args, that one dictionary on the partial object is modified. This is easier to spot with a list in a single process.

>>> def foo(name, t_list):
...     t_list.append(name)
...     return t_list
... 
>>> mod = []
>>> f = functools.partial(foo, t_list=mod)
>>> f(0)
[0]
>>> f(1)
[0, 1]
>>> f(2)
[0, 1, 2]
>>> mod
[0, 1, 2]

When you pool.map(f_args, iterator), f_args is pickled and sent to each subprocess to be the worker. So, each subprocess has a unique copy of the dictionary that will be updated for every iterated value the subprocess happens to get.

For efficiency, multiprocessing will chunk data. That is, each subprocess is handed a list of iterated values that it will process into a list of responses to return as a group. But since each response is referencing the same single dict, when the chunk is returned to the parent all of the responses only hold the final value set. If 0, 1, 2 were processed, the return is 2, 2, 2.

The solution will depend on your data. Its expensive to pass data back and forth between pool process and the parent, so ideally the data is generated completely in the worker. In this case, ditch the partial and have the worker create the dict.

Its likely your situation is more complicated than this.

import multiprocessing as mp
import numpy as np

def test(name):
    retrurn ('a':name}

def mp_func(func, iterator ,**kwargs):
    pool = mp.Pool(mp.cpu_count())
    res = pool.map(test, iterator)
    pool.close()
    return res

m =33
res = mp_func(func=test, iterator=np.arange(m))
for di in res:
    print(di['a'])
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Thanks for your answer. Yeah, unfortunately my situation is more complicated. I need to use partial since the test function has actually way more arguments. –  Nov 22 '20 at 09:10
  • You don't really need a partial specifically, just some way to pass the needed data to the subprocess. If you are on a unix like system you can leverage the fact that forked subprocesses get a copy-on-write view of parent memory and data doesn't need to be sent. You may be able to use shared memory. Or perphaps source the data in the subprocesses. In this trivial case, the worker could copy any mutable containers like that dict and use it for the return. – tdelaney Nov 22 '20 at 18:27
0

As everyone is telling you, in general, it is a bad idea to have multiple threads/processes all modifying the same location, and then expecting that location to have the value that your thread gave it.

Your code will run better if all mutating of the shared data structure happens in only one place. So the general plan is:

def worker(key):
    ... calculate value produced by key ...
    return key, value

def runner():
    with mp.Pool() as pool:
       for key, value in pool.imap_unordered(worker, np.arange(m), chunksize=...):
           ... do fast mutation here ...
Frank Yellin
  • 9,127
  • 1
  • 12
  • 22