0

I want to have multiple processes read from a different row of a numpy array in parallel to speed things up. However, when I run the following code, the first process to reach func throws an error as if var is no longer in scope. Why is this happening?

import numpy as np
import multiprocessing as mp

num_procs = 16
num_points = 2500000

def init_worker(X):
    global var
    var = X

def func(proc):
    X_np = np.frombuffer(var).reshape((num_procs, num_points))
    for y in range(num_points):
        z = X_np[proc][y]

if __name__ == '__main__':
    data = np.random.randn(num_procs, num_points)
    X = mp.RawArray('d', num_procs*num_points)
    X_np = np.frombuffer(X).reshape((num_procs, num_points))
    np.copyto(X_np, data)
    pool = mp.Pool(processes=4, initializer=init_worker, initargs=(X,))
    for proc in range(num_procs):
        pool.apply_async(func(proc))
    pool.close()
    pool.join()
Traceback (most recent call last):
  File "parallel_test.py", line 26, in <module>
    pool.apply_async(func(proc))
  File "parallel_test.py", line 13, in func
    X_np = np.frombuffer(var).reshape((num_procs, num_points))
NameError: global name 'var' is not defined

Update: For some reason, if I use Pool.map instead of the for loop with Pool.apply_async, it seems to work. I don’t understand why though.

van
  • 1
  • 2
  • 1
    Welcome to SO. Please take the [tour](https://stackoverflow.com/tour), read [How do I ask a good question?](https://stackoverflow.com/help/how-to-ask) and [How to create a Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example). Please add the entire stacktrace to your question, it will help us provide better answers. – Michael Ruth Apr 04 '21 at 03:40
  • see my recent [answer](https://stackoverflow.com/a/66936542/3220135) on using globals for constants (data you don't change) in multiprocessing. If you want to read data back from the child processes using a real shared array, see my other [answer](https://stackoverflow.com/a/66380200/3220135) on the topic from a little while ago. – Aaron Apr 04 '21 at 06:53
  • Thanks, I am trying to use a real shared array, that previous answer was helpful. I’m still not sure what was wrong with what I had though. – van Apr 04 '21 at 17:54
  • `pool.apply_async(func(proc))` should be `pool.apply_async(func, args=(proc,))`. You were calling `func` from the main process. – Booboo Apr 05 '21 at 13:43

1 Answers1

0

Any reason to not declare X as global in the top-level scope? This eliminates the NameError.

import numpy as np
import multiprocessing as mp

num_procs = 16 
num_points = 25000000


def func(proc):
    X_np = np.frombuffer(X).reshape((num_procs, num_points))
    for y in range(num_points):
        z = X_np[proc][y]

if __name__ == '__main__':
    data = np.random.randn(num_procs, num_points)
    global X 
    X = mp.RawArray('d', num_procs*num_points)
    X_np = np.frombuffer(X).reshape((num_procs, num_points))
    np.copyto(X_np, data)
    pool = mp.Pool(processes=4 )
    for proc in range(num_procs):
        pool.apply_async(func(proc))
    pool.close()
    pool.join()

When I run a reduced instance of this problem, n=20:

import numpy as np
import multiprocessing as mp

num_procs = 4 
num_points = 5


def func(proc):
    X_np = np.frombuffer(X).reshape((num_procs, num_points))
    for y in range(num_points):
        z = X_np[proc][y]

if __name__ == '__main__':
    data = np.random.randn(num_procs, num_points)
    global X 
    X = mp.RawArray('d', num_procs*num_points)
    X_np = np.frombuffer(X).reshape((num_procs, num_points))
    np.copyto(X_np, data)
    pool = mp.Pool(processes=4 )
    for proc in range(num_procs):
        pool.apply_async(func(proc))
    pool.close()
    pool.join()
    print("\n".join(map(str, X)))

I get the following output:

-0.6346037804619162
1.1005724710066107
0.33458763357165255
0.6409345714971889
0.7124888766851982
0.36760459213332963
0.23593304931386933
-0.8668969562941349
-0.8842756219923469
0.005979036105620422
1.386422154089567
-0.8770988782214508
0.25187448339771057
-0.2473967968471952
-0.4909708883978521
0.5423521489750244
0.018749603867333802
0.035304792504378055
1.3263872668956616
1.0199839603892742

You haven't provided a sample of the expected output. Does this look similar to what you expect?

Michael Ruth
  • 2,938
  • 1
  • 20
  • 27
  • Try `get`ting any of the `AsyncResult`s you generate here... you get a name error still because X is not defined in the child processes when spawn is used... (I'm assuming you had a couple commas in your `apply_async` line here too that are just missing in your post. otherwise you're not actually executing anything in the children) – Aaron Apr 04 '21 at 06:43
  • I was trying to compare how long it takes to iterate through a large array with multiple cores in parallel vs. without, which is why func doesn’t do anything or produce any output. – van Apr 04 '21 at 17:48