1

I'm trying to update a shared variable (numpy array in a namespace) when using the multiprocessing module. However, the variable is not updated and I dont understand why.

Here is a sample code to illustrate this:

from multiprocessing import Process, Manager
import numpy as np

chunk_size = 15
arr_length = 1000
jobs = []
namespace = Manager().Namespace()
namespace.arr = np.zeros(arr_length)
nb_chunk = arr_length/chunk_size + 1


def foo(i, ns):
    from_idx = chunk_size*i
    to_idx = min(arr_length, chunk_size*(i+1))
    ns.arr[from_idx:to_idx] = np.random.randint(0, 100, to_idx-from_idx)

for i in np.arange(nb_chunk):
    p = Process(target=foo, args=(i, namespace))
    p.start()
    jobs.append(p)
for i in np.arange(nb_chunk):
    jobs[i].join()

print namespace.arr[:10]
hansaplast
  • 11,007
  • 2
  • 61
  • 75
GuillaumeA
  • 3,493
  • 4
  • 35
  • 69

2 Answers2

2

You can not share in-built objects like list, dict across processes in Python. In order to share data between process, Python's multiprocessing provide two data structure:

Also read: Exchanging objects between processes

Moinuddin Quadri
  • 46,825
  • 13
  • 96
  • 126
  • ok, but how could I take advantage that in each process, I do not access the same memory space of the array ? – GuillaumeA Feb 01 '17 at 16:58
  • Similar to your current code, by passing them as an argument to function (`foo()` in your case). You may refer [Running multiple asynchronous function and get the returned value of each function](http://stackoverflow.com/questions/40536287/running-multiple-asynchronous-function-and-get-the-returned-value-of-each-functi), the question I asked in the past. It is having answer with the usage of `Queue` – Moinuddin Quadri Feb 01 '17 at 17:01
2

The issue is that the Manager().Namespace() object doesn't notice that you're changing anything using ns.arr[from_idx:to_idx] = ... (as you're working on a inner data structure) and thus doesn't propagate to the other processes.

This answer explains very good what's going on here.

To fix it, create the list as a Manager().List() and pass this list to the processes, so that ns[from_idx:to_idx] = ... is recognized as a change and is propagated to the processes:

from multiprocessing import Process, Manager
import numpy as np

chunk_size = 15
arr_length = 1000
jobs = []
arr = Manager().list([0] * arr_length)

nb_chunk = arr_length/chunk_size + 1


def foo(i, ns):
    from_idx = chunk_size*i
    to_idx = min(arr_length, chunk_size*(i+1))
    ns[from_idx:to_idx] = np.random.randint(0, 100, to_idx-from_idx)

for i in np.arange(nb_chunk):
    p = Process(target=foo, args=(i, arr))
    p.start()
    jobs.append(p)
for i in np.arange(nb_chunk):
    jobs[i].join()

print arr[:10]
Community
  • 1
  • 1
hansaplast
  • 11,007
  • 2
  • 61
  • 75