2

I'm trying to used multiprocessing shared array. I want to create two functions, each of which works with a common data set. But only one function modifies the array.

Here is my code:

import numpy as np
import multiprocessing as mp
import time

arr = mp.Array('f', np.array([1,2,3,4]))

def testfunc1(arr):
    while True:
        arr = np.concatenate((arr, np.array([0])))
        print(arr)
        time.sleep(2)

def testfunc2(arr):
    while True:
        arr[0] *= 2
        time.sleep(2)
        print(arr)

proc1 = mp.Process(target=testfunc1, args=(arr,))
proc2 = mp.Process(target=testfunc2, args=(arr,))

proc1.start()
proc2.start()

That's the result:

[1. 2. 3. 4. 0.]
[1. 2. 3. 4. 0. 0.]
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7fcd865bfdd0>>
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7fcd865bfdd0>>
[1. 2. 3. 4. 0. 0. 0.]
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7fcd865bfdd0>>
[1. 2. 3. 4. 0. 0. 0. 0.]
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7fcd865bfdd0>>
[1. 2. 3. 4. 0. 0. 0. 0. 0.]

testfunc2 doesn't seem to work. What do I do wrong?

Klaus D.
  • 13,874
  • 5
  • 41
  • 48
Sinscere
  • 31
  • 2

3 Answers3

0

Multiprocessing leads to seperate Python processes, so they won't share memory.

I don't know if I would advise it as good practice, but multiprocessing does have a shared_memory class which you could explore: https://docs.python.org/3/library/multiprocessing.shared_memory.html

It doesn't look too simple to use, and in my opinion feels a bit like an antipattern, but there it is.

Otherwise I would advise either:

  1. use Queues to communicate between processes, and restructure your apps so they work that way.
  2. use the threading module. Keep in mind that you WILL end up with race conditions where the results of the program change (or error) differently each time you run your program.

I strongly recommend Raymond Hettinger's keynote on concurrency in python and you will better understand how to approach it: https://pybay.com/site_media/slides/raymond2017-keynote/index.html

Peter White
  • 328
  • 2
  • 8
0

The issue here is that you are trying to write to arr in a different process, but that arr is not the arr you expect. You need to think a little differently when dealing with multiple processors which have separate memory boundaries.

Check out this answer on how you could resolve this.

wombat
  • 614
  • 3
  • 18
0

You're on the right track. While you should strive to avoid sharing states between processes, multiprocessing.Array was created so that you can share states whenever you absolutely need it. Here, I've just commented out the part where you perform any operations on the array.

import numpy as np
import multiprocessing as mp
import time

arr = mp.Array('f', np.array([1,2,3,4]))

def testfunc1(arr):
    while True:
        # arr = np.concatenate((arr, np.array([0])))
        print(arr)
        time.sleep(2)

def testfunc2(arr):
    while True:
        # arr[0] *= 2
        time.sleep(2)
        print(arr)

proc1 = mp.Process(target=testfunc1, args=(arr,))
proc2 = mp.Process(target=testfunc2, args=(arr,))

proc1.start()
proc2.start()

This prints out the following:

<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7f82f890f340>>
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7f82f890f340>>
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_float_Array_4 object at 0x7f82f890f340>>

Python is basically converting the NumPy array into a SynchronizedArray object and sharing it between the processes. The problem in your code lies in testfunc2 where you're trying to perform a numpy operation on the SynchronizedArray object. In testfunc1 notice that you're implicitly converting the SynchronizedArray to numpy object via np.concatenate. To solve this, change the code like this:

import numpy as np
import multiprocessing as mp
import time

arr = mp.Array('f', np.array([1,2,3,4]))

def testfunc1(arr):
    while True:
        arr = np.concatenate((arr, np.array([0])))
        print(arr)
        time.sleep(2)

def testfunc2(arr):
    arr = np.array(arr)
    while True:
        arr[0] = arr[0] * 2
        time.sleep(2)
        print(arr)

proc1 = mp.Process(target=testfunc1, args=(arr,))
proc2 = mp.Process(target=testfunc2, args=(arr,))

proc1.start()
proc2.start()

This returns:

[1. 2. 3. 4. 0.]
[2. 2. 3. 4.]
[1. 2. 3. 4. 0. 0.]
[4. 2. 3. 4.]
[1. 2. 3. 4. 0. 0. 0.]
[8. 2. 3. 4.]
[1. 2. 3. 4. 0. 0. 0. 0.]
[16.  2.  3.  4.]
[1. 2. 3. 4. 0. 0. 0. 0. 0.]
[32.  2.  3.  4.]
[1. 2. 3. 4. 0. 0. 0. 0. 0. 0.]
[64.  2.  3.  4.]
[1. 2. 3. 4. 0. 0. 0. 0. 0. 0. 0.]

However, is this what you wanted? Otherwise, I'd look at the other two answers that suggest to reform your problem so that you don't need shared states between processes.

Redowan Delowar
  • 1,580
  • 1
  • 14
  • 36
  • 1
    Thank you for your reply, but unfortunately this is not what I wanted to do( I need two different functions to work on the same array. In my example, this should lead to the following result: `[1. 2. 3. 4. 0.] [2. 2. 3. 4. 0.] [2. 2. 3. 4. 0. 0.] [4. 2. 3. 4. 0. 0.] [4. 2. 3. 4. 0. 0. 0.] [8. 2. 3. 4. 0. 0. 0.]` – Sinscere Nov 17 '21 at 20:48