2

I've spent some time research the Python multiprocessing module, it's use of os.fork, and shared memory using Array in multiprocessing for a piece of code I'm writing.

The project itself boils down to this: I have several MxN arrays (let's suppose I have 3 arrays called A, B, and C) that I need to process to calculate a new MxN array (called D), where:

Dij = f(Aij, Bij, Cij)

The function f is such that standard vector operations cannot be applied. This task is what I believe is called "embarrassing parallel". Given the overhead involved in multiprocessing, I am going to break the calculation of D into blocks. For example, if D was 8x8 and I had 4 processes, each processor would be responsible for solving a 4x4 "chunk" of D.

Now, the size of the arrays has the potential to be very big (on the order of several GB), so I want all arrays to use shared memory (even array D, which will have sub-processes writing to it). I believe I have a solution to the shared array issue using a modified version of what is presented here.

However, from an implementation perspective it'd be nice to place arrays A, B, and C into a dictionary. What is unclear to me is if doing this will cause the arrays to be copied in memory when the reference counter for the dictionary is incremented within each sub-process.

To try and answer this, I wrote a little test script (see below) and tried running it using valgrind --tool=massif to track memory usage. However, I am not quite clear how to intemperate the results from it. Specifically, whether each massiff.out file (where the number of files is equal to the number of sub-processes created by my test script + 1) denotes the memory used by that process (i.e. I need to sum them all up to get the total memory usage) or if I just need to consider the massif.out associated with the parent process.

On a side note: One of my shared memory arrays has the sub-processes writing to it. I know that this sound be avoided, especially since I am not using locks to limit only one sub-process writing to the array at any given time. Is this a problem? My thought is that since the order that the array is filled out is irrelevant, the calculation of any index is independent of any other index, and that any given sub-process will never write to the same array index as any other process, there will not be any sort of race conditions. Is this correct?

#! /usr/bin/env python

import multiprocessing as mp
import ctypes
import numpy as np
import time
import sys
import timeit

def shared_array(shape=None, lock=False):
    """
    Form a shared memory numpy array.

    https://stackoverflow.com/questions/5549190/is-shared-readonly-data-copied-to-different-processes-for-python-multiprocessing 
    """

    shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)

    # Create a locked or unlocked array
    if lock:

        shared_array = np.frombuffer(shared_array_base.get_obj())

    else:

        shared_array = np.frombuffer(shared_array_base)

    shared_array = shared_array.reshape(*shape)

    return shared_array

def worker(indices=None, queue=None, data=None):

    # Loop over each indice and "crush" some data
    for i in indices:

        time.sleep(0.01)

        if data is not None:

            data['sink'][i, :] = data['source'][i, :] + i

        # Place ID for completed indice into the queue
        queue.put(i)

if __name__ == '__main__':

    # Set the start time
    begin = timeit.default_timer()

    # Size of arrays (m x n)
    m = 1000

    n = 1000

    # Number of Processors
    N = 2

    # Create a queue to use for tracking progress
    queue = mp.Queue()

    # Create dictionary and shared arrays
    data = dict()

    # Form a shared array without a lock.
    data['source'] = shared_array(shape=(m, n), lock=True)

    data['sink']   = shared_array(shape=(m, n), lock=False)

    # Create a list of the indices associated with the m direction
    indices = range(0, m)

    # Parse the indices list into range blocks; each process will get a block
    indices_blocks = [int(i) for i in np.linspace(0, 1000, N+1)]

    # Initialize a list for storing created sub-processes
    procs = []

    # Print initialization time-stap
    print 'Time to initialize time: {}'.format(timeit.default_timer() - begin)

    # Create and start each sbu-process
    for i in range(1, N+1):

        # Start of the block
        start = indices_blocks[i-1]

        # End of the block
        end   = indices_blocks[i]

        # Create the sub-process
        procs.append(mp.Process(target=worker,
                                args=(indices[start:end], queue, data)))

        # Kill the sub-process if/when the parent is killed
        procs[-1].daemon=True

        # Start the sub-process
        procs[-1].start()


    # Initialize a list to store the indices that have been processed
    completed = []

    # Entry a loop dependent on whether any of the sub-processes are still alive
    while any(i.is_alive() for i in procs):

        # Read the queue, append completed indices, and print the progress
        while not queue.empty():

            done = queue.get()

            if done not in completed:

                completed.append(done)

            message = "\rCompleted {:.2%}".format(float(len(completed))/len(indices))

            sys.stdout.write(message)

            sys.stdout.flush()

    print ''

    # Join all the sub-processes
    for p in procs:

        p.join()    

    # Print the run time and the modified sink array
    print 'Running time: {}'.format(timeit.default_timer() - begin)

    print data['sink']

Edit: I seems I've run into another issue; specifically, an value of n equal to 3 million will result in the kernel killing the process (I assume it's due to a memory issue). This appears to be with how shared_array() works (I can create np.zeros arrays of the same size and not have an issue). After playing with it a bit I get the traceback shown below. I'm not entirely sure what is causing the memory allocation error, but a quick Google search gives discussions about how mmap maps virtual address space, which I'm guessing is smaller than the amount of physical memory a machine has?

Traceback (most recent call last):
File "./shared_array.py", line 66, in <module>
    data['source'] = shared_array(shape=(m, n), lock=True)
File "./shared_array.py", line 17, in shared_array
    shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)
File "/usr/apps/python/lib/python2.7/multiprocessing/__init__.py", line 260, in Array
    return Array(typecode_or_type, size_or_initializer, **kwds)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 120, in Array
    obj = RawArray(typecode_or_type, size_or_initializer)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 88, in RawArray
    obj = _new_value(type_)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 68, in _new_value
    wrapper = heap.BufferWrapper(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 243, in __init__
    block = BufferWrapper._heap.malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 223, in malloc
    (arena, start, stop) = self._malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 120, in _malloc
    arena = Arena(length)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 82, in __init__
    self.buffer = mmap.mmap(-1, size)
mmap.error: [Errno 12] Cannot allocate memory
Community
  • 1
  • 1
Shaun
  • 155
  • 1
  • 8

0 Answers0