4

I'm playing with python's multiprocessing module and shared memory. I able to use a shared memory object with Process, but not with Pool. I added a callback for my Pool, and the callback doesn't seem to be invoked, either.

from multiprocessing import Array, Pool, Process

def flip(x,a):
    a[x] = 0 if a[x] else 1
    return (x, a[x])

def cb(result):
    print(result)

if __name__ == '__main__':

    # size of array
    N = 10

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B', N, lock=False)

    # flip values to ones using Process
    processes = [Process(target=flip, args=(x, a)) for x in range(N)]
    for p in processes: p.start()
    for p in processes: p.join()
    print([a[i] for i in range(N)])    

    # flip values back to zeros using Pool
    pool = Pool(processes=4)
    for x in range(N):
        pool.apply_async(flip, args=(x, a), callback=cb)
    pool.close()
    pool.join()
    print([a[i] for i in range(N)])

I'd expect my shared array to get printed once with all 1's, followed by single lines printed by callback and the array again with all 0's, but get this instead;

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Why isn't the Pool running the tasks?

Taking out shared memory, for the sake of a minimal example;

def f(x):
    return x

def cb(result):
    print('cb',result)

if __name__ == '__main__':

    pool = Pool(processes=4)
    pool.apply_async(f, range(10), callback=cb)
    pool.close()
    pool.join()

I'd expect this to print the numbers 0 to 9 on separate lines, but it outputs nothing.

If I replace the apply_sync call immediately above with this;

pool.apply_async(f, args=[10], callback=cb)

I get the output

cb 10

Replacing the [10] with range(10), [1,2,3], [(1),(2),(3)], or ([1],[2],[3]) yields no output.

CAB
  • 1,106
  • 9
  • 23
  • 1
    for x in range(N): pool.apply .... Are you applying flip 4 times? 1 - 0 - 1 - 0 - 1 (ends in 1) – chapelo Oct 27 '16 at 20:17
  • @chapelo - 4 is the number of workers in the Pool. The `for` should create 10 tasks for them to do. Because `cb` is never invoked, it seems none of the tasks are being executed. – CAB Oct 27 '16 at 20:26
  • 1
    Multiprocessing works differently on Windows and "unixy" systems such as Linux. What platform do you use? – tdelaney Oct 27 '16 at 20:45
  • @tdelaney At the moment Windows 7 Pro, Python 3.4m – CAB Oct 27 '16 at 20:46
  • `apply_sync` returns a `Result` object. You should put those in a list and call `result.get()` on each one of them to see if an error is raised. I get `RuntimeError: c_ubyte_Array_10 objects should only be shared between processes through inheritance` on my Linux machine (see @zvone 's link). I don't know if Windows will be different. – tdelaney Oct 27 '16 at 21:12
  • If you get the same thing, or if that linked answer works for you, we can mark this a dup. – tdelaney Oct 27 '16 at 21:13

1 Answers1

0

To consider the use of multiprocessing the data is usually very large. It doesn't make sense to assign one process for each piece of data like you have done with N processes for an N-size array.

Consider these two approaches:

1) Each process will handle a chunk of the array. See flip_many() and partition()

2) Each piece of data is mapped to a pool worker. See flip_one()

The rest of the code is very close to your original code.

from multiprocessing import Array, Pool, Process

def flip_many(start_idx, end_idx):
    for idx in range(start_idx, end_idx + 1):
        a[idx] = not(a[idx])

def flip_one(idx):
    a[idx] = not(a[idx])
    return idx, a[idx]

def cb(result):
    print(result)

def partition(range_, n):
    start, end = range_
    size = (end - start) // n
    ranges = []
    for _ in range(n):
        ranges.append((start, start+size-1))
        start += size
    if ranges[-1][1] != end-1:
        ranges[-1] = (ranges[-1][0], end-1)
    return ranges    

if __name__ == '__main__':

    # size of array
    N = 10
    N_procs = 2
    ranges = partition( (0, N), N_procs )

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B', N, lock=False)
    print([a[i] for i in range(N)], "elements of array initialized to 0")    

    # flip values to ones using Process

    processes = []
    for i in range(N_procs):
        p = Process(target=flip_many, args=(*ranges[i], ))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print([a[i] for i in range(N)], "First flip by N processes, should be 1")    

    # flip values back to zeros using Pool
    pool = Pool()
    indices = range(N)
    pool.map(flip_one, indices)
    print([a[i] for i in range(N)], "Second flip by the pool.map ... 0")

    pool.map(flip_one, indices, chunksize=N // N_procs)
    print([a[i] for i in range(N)], "Third flip by the pool.map ... 1")

    pool.map_async(flip_one, indices, callback=cb)
    print([a[i] for i in range(N)], "Fourth flip by the pool.map_async ... 0")
    print("    Due to the async nature, flip not reflected until .join()")
    print("    But the callback returns the correct results:")

    pool.close()
    pool.join()
    print([a[i] for i in range(N)], "Content after the join... 0")
chapelo
  • 2,519
  • 13
  • 19