0

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Array

What I’m trying to do Create an array in MainProcess and send it through inheritance to any subsequent child processes. The child processes will change the array. The parent process will look out for the changes and act accordingly.

The problem The parent process does not "see" any changes done by the child processes. However the child processes do "see" the changes. Ie if child 1 adds an item then child 2 will see that item etc

This is true for sARRAY and iARRAY, and iVALUE.

BUT While the parent process seems to be oblivious to the array values it does take notice of the changes done to the iVALUE.

I don’t understand what I’m doing wrong.

UPDATE 2 https://stackoverflow.com/a/6455704/1267259 The main source of confusion is that multiprocessing uses separate processes and not threads. This means that any changes to object state made by the children aren't automatically visible to the parent.

To clarify. What I want to do is possible, right? https://stackoverflow.com/a/26554759/1267259 I mean that's the purpose with multiprocessing Array and Value, to communicate between children and parent processes? And iVALUE works so...

I’ve found this Shared Array not shared correctly in python multiprocessing

But I don’t understand the answer "Assigning to values that have meaning in all processes seems to help:"

UPDATE 1 Found Python : multiprocessing and Array of c_char_p

> "the assignment to arr[i] points arr[i] to a memory address that was only meaningful to the subprocess making the assignment. The other subprocesses retrieve garbage when looking at that address."

As I understand it this doesn't apply to this problem. The assignment by one subprocess to the array does make sense to the other subprocesses in this case. But why doesn't it make sense for the main process?

And I am aware of "managers" but it feels like Array should suffice for this use case. I've read the manual but obviously I don't seem to get it.

UPDATE 3 Indeed, this works

manage = multiprocessing.Manager()
manage = list(range(3))

So...

What am I doing wrong?

import multiprocessing
import ctypes

class MainProcess:

    # keep track of process
    iVALUE = multiprocessing.Value('i',-1) # this works
    # keep track of items
    sARRAY = multiprocessing.Array(ctypes.c_wchar_p, 1024) # this works between child processes
    iARRAY = multiprocessing.Array(ctypes.c_int, 3) # this works between child processes

    pLOCK = multiprocessing.Lock()

def __init__(self):
    # create an index for each process
    self.sARRAY.value = [None] * 3
    self.iARRAY.value = [None] * 3

def InitProcess(self):
    # list of items to process
    items = []
    item = (i for i in items)
    with(multiprocessing.Pool(3)) as pool:
        # main loop: keep looking for updated values
        while True:
            try:
                pool.apply_async(self.worker, (next(item),callback=eat_finished_cake))
            except StopIteration:
                pass

            print(self.sARRAY) # yields [None][None][None]
            print(self.iARRAY) # yields [None][None][None]
            print(self.iVALUE) # yields 1-3

    pool.close()
    pool.join()

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'
    self.iARRAY.value[self.iVALUE.value] = 2
    # on next child process run
    print(self.iVALUE.value) # prints 1
    print(self.sARRAY.value) # prints ['item 1'][None][None]
    print(self.iARRAY.value) # prints [2][None][None]

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

UPDATE 4 Changing

pool.apply_async(self.worker, (next(item),))

To

x = pool.apply_async(self.worker, (next(item),))
print(x.get())

Or

x = pool.apply(self.worker, (next(item),))
print(x)

And in self. worker() returning self.iARRAY.value or self.sARRAY.value does return a variable that has the updated value. This is not what I want to achieve though, this doesn't event require the use of ARRAY to achive...

So I need to clarify. In the self.worker() I'm doing important heavy lifting that can take a long time and I need to send back information to the main process, eg the progress before the return value is finished to be sent to the callback.

I don't expect the return of the finished worked result to the main method/that is to be handled by the callback function. I see now that omitting the callback in the code example could've give a different impression sorry.

UPDATE 5 Re: Use numpy array in shared memory for multiprocessing

I've seen that answer and tried a variation of it using initilaizer() with a global var and passed array through initargs with no luck. I don't understand the use of nymphs and with "closing()" but that code doesn't seem to access the "arr" inside main() although shared_arr is used, but only after p.join().

As far as I can see the array is declared then turned to a nymph and inherited through init(x). My code should have the same behavior as that code so far.

One major difference seems to be how the array is accessed

I've only succeeded setting and getting array value using the attribute value, when I tried

self.iARRAY[0] = 1 # instead of iARRAY.value = [None] * 3
self.iARRAY[1] = 1
self.iARRAY[2] = 1

print(self.iARRAY) # prints <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_3 object at 0x7f9cfa8538c8>>

And I can't find a method to access and check the values (the attribute "value" gives an unknown method error)

Another major difference from that code is the prevention of data copying using the get_obj().

Isn't this a nymphy issue?

assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

Not sure how to make use of that.

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'

    with self.iARRAY.get_lock():
        arr = self.iARRAY.get_obj()
        arr[self.iVALUE.value] = 2   # and now ??? 

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

UPDATE 6 I've tried using multiprocessing.Process() instead of Pool() but the result is the same.

Community
  • 1
  • 1
user1267259
  • 761
  • 2
  • 10
  • 22
  • here's a [code example that shares `mp.Array` between a pool of processes](http://stackoverflow.com/a/7908612/4279) – jfs Jan 20 '15 at 19:43
  • 1
    it seems you got lost in broad daylight. Start with [a hello world example from the docs](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes), try to change it until it breaks. Describe using words what do you want to happen and what happens instead. – jfs Jan 21 '15 at 17:29
  • +1 for reminding me to go back to basics. But I'm not use what to make of "Describe using words what do you want to happen and what happens instead." Did you read my question? – user1267259 Jan 21 '15 at 21:21
  • yes. I've read your question. The description is wrong. Otherwise the hello world example would be enough. – jfs Jan 22 '15 at 03:59
  • I suppose the question could be improved. What would be a more appropriate description? And should I edit the original question? – user1267259 Jan 22 '15 at 13:27

2 Answers2

1

correct way to declare the global variable (in this case class attribute)

iARRAY = multiprocessing.Array(ctypes.c_int, range(3))

correct way to set value

self.iARRAY[n] = x

correct way to get value

self.iARRAY[n]

Not sure why the examples I've seen had used Array(ctypes.c_int, 3) and iARRAY.value[n] but in this case that was wrong

user1267259
  • 761
  • 2
  • 10
  • 22
0

This is your problem:

while True:
    try:
        pool.apply_async(self.worker, (next(item),))
    except StopIteration:
        pass

    print(self.sARRAY) # yields [None][None][None]
    print(self.iARRAY) # yields [None][None][None]
    print(self.iVALUE) # yields 1-3

The function pool.apply_async() starts the subprocess running and returns immediately. You don't appear to be waiting for the workers to finish. For that, you might consider using a barrier.

Kevin
  • 28,963
  • 9
  • 62
  • 81
  • I'm looking into the barrier method but could I trouble you for an example of what you're thinking? As I understand you you're saying I should put a "wait()" after pool_apply() If that's the case why does iVALUE return a value? And why does Manager() work? Also I changed apply_sync() to apply() yet the problem persists. – user1267259 Jan 20 '15 at 17:53