https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Array
What I’m trying to do Create an array in MainProcess and send it through inheritance to any subsequent child processes. The child processes will change the array. The parent process will look out for the changes and act accordingly.
The problem The parent process does not "see" any changes done by the child processes. However the child processes do "see" the changes. Ie if child 1 adds an item then child 2 will see that item etc
This is true for sARRAY and iARRAY, and iVALUE.
BUT While the parent process seems to be oblivious to the array values it does take notice of the changes done to the iVALUE.
I don’t understand what I’m doing wrong.
UPDATE 2 https://stackoverflow.com/a/6455704/1267259 The main source of confusion is that multiprocessing uses separate processes and not threads. This means that any changes to object state made by the children aren't automatically visible to the parent.
To clarify. What I want to do is possible, right? https://stackoverflow.com/a/26554759/1267259 I mean that's the purpose with multiprocessing Array and Value, to communicate between children and parent processes? And iVALUE works so...
I’ve found this Shared Array not shared correctly in python multiprocessing
But I don’t understand the answer "Assigning to values that have meaning in all processes seems to help:"
UPDATE 1 Found Python : multiprocessing and Array of c_char_p
> "the assignment to arr[i] points arr[i] to a memory address that was only meaningful to the subprocess making the assignment. The other subprocesses retrieve garbage when looking at that address."
As I understand it this doesn't apply to this problem. The assignment by one subprocess to the array does make sense to the other subprocesses in this case. But why doesn't it make sense for the main process?
And I am aware of "managers" but it feels like Array should suffice for this use case. I've read the manual but obviously I don't seem to get it.
UPDATE 3 Indeed, this works
manage = multiprocessing.Manager() manage = list(range(3))
So...
What am I doing wrong?
import multiprocessing
import ctypes
class MainProcess:
# keep track of process
iVALUE = multiprocessing.Value('i',-1) # this works
# keep track of items
sARRAY = multiprocessing.Array(ctypes.c_wchar_p, 1024) # this works between child processes
iARRAY = multiprocessing.Array(ctypes.c_int, 3) # this works between child processes
pLOCK = multiprocessing.Lock()
def __init__(self):
# create an index for each process
self.sARRAY.value = [None] * 3
self.iARRAY.value = [None] * 3
def InitProcess(self):
# list of items to process
items = []
item = (i for i in items)
with(multiprocessing.Pool(3)) as pool:
# main loop: keep looking for updated values
while True:
try:
pool.apply_async(self.worker, (next(item),callback=eat_finished_cake))
except StopIteration:
pass
print(self.sARRAY) # yields [None][None][None]
print(self.iARRAY) # yields [None][None][None]
print(self.iVALUE) # yields 1-3
pool.close()
pool.join()
def worker(self,item):
with self.pLOCK:
self.iVALUE.value += 1
self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'
self.iARRAY.value[self.iVALUE.value] = 2
# on next child process run
print(self.iVALUE.value) # prints 1
print(self.sARRAY.value) # prints ['item 1'][None][None]
print(self.iARRAY.value) # prints [2][None][None]
sleep(0.5)
...
with self.pLOCK:
self.iVALUE.value -= 1
UPDATE 4 Changing
pool.apply_async(self.worker, (next(item),))
To
x = pool.apply_async(self.worker, (next(item),))
print(x.get())
Or
x = pool.apply(self.worker, (next(item),))
print(x)
And in self. worker() returning self.iARRAY.value or self.sARRAY.value does return a variable that has the updated value. This is not what I want to achieve though, this doesn't event require the use of ARRAY to achive...
So I need to clarify. In the self.worker() I'm doing important heavy lifting that can take a long time and I need to send back information to the main process, eg the progress before the return value is finished to be sent to the callback.
I don't expect the return of the finished worked result to the main method/that is to be handled by the callback function. I see now that omitting the callback in the code example could've give a different impression sorry.
UPDATE 5 Re: Use numpy array in shared memory for multiprocessing
I've seen that answer and tried a variation of it using initilaizer() with a global var and passed array through initargs with no luck. I don't understand the use of nymphs and with "closing()" but that code doesn't seem to access the "arr" inside main() although shared_arr is used, but only after p.join().
As far as I can see the array is declared then turned to a nymph and inherited through init(x). My code should have the same behavior as that code so far.
One major difference seems to be how the array is accessed
I've only succeeded setting and getting array value using the attribute value, when I tried
self.iARRAY[0] = 1 # instead of iARRAY.value = [None] * 3
self.iARRAY[1] = 1
self.iARRAY[2] = 1
print(self.iARRAY) # prints <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_3 object at 0x7f9cfa8538c8>>
And I can't find a method to access and check the values (the attribute "value" gives an unknown method error)
Another major difference from that code is the prevention of data copying using the get_obj().
Isn't this a nymphy issue?
assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)
Not sure how to make use of that.
def worker(self,item):
with self.pLOCK:
self.iVALUE.value += 1
self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'
with self.iARRAY.get_lock():
arr = self.iARRAY.get_obj()
arr[self.iVALUE.value] = 2 # and now ???
sleep(0.5)
...
with self.pLOCK:
self.iVALUE.value -= 1
UPDATE 6 I've tried using multiprocessing.Process() instead of Pool() but the result is the same.