13

I have a multiprocessing job where I'm queuing read only numpy arrays, as part of a producer consumer pipeline.

Currently they're being pickled, because this is the default behaviour of multiprocessing.Queue which slows down performance.

Is there any pythonic way to pass references to shared memory instead of pickling the arrays?

Unfortunately the arrays are being generated after the consumer is started, and there is no easy way around that. (So the global variable approach would be ugly...).

[Note that in the following code we are not expecting h(x0) and h(x1) to be computed in parallel. Instead we see h(x0) and g(h(x1)) computed in parallel (like a pipelining in a CPU).]

from multiprocessing import Process, Queue
import numpy as np

class __EndToken(object):
    pass

def parrallel_pipeline(buffer_size=50):
    def parrallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parrallel_pipeline_with_args


@parrallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parrallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parrallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

if __name__ == "__main__":
    rs = f(g(h(xs())))
    for r in rs:
        print r
N. McA.
  • 4,796
  • 4
  • 35
  • 60
  • Can you share some code? – Alicia Garcia-Raboso Jul 29 '16 at 19:19
  • Hm, not the actual code. Will mock up something similar. – N. McA. Jul 29 '16 at 19:22
  • As long as it is a [Minimal, Complete, and Verifiable example](http://stackoverflow.com/help/mcve)... – Alicia Garcia-Raboso Jul 29 '16 at 19:24
  • @AlbertoGarcia-Raboso Hope that is suitable? – N. McA. Jul 29 '16 at 19:33
  • You're asking about avoiding pickling, yet `pickle` is nowhere in your code. Please clarify. – Alicia Garcia-Raboso Jul 29 '16 at 19:53
  • @AlbertoGarcia-Raboso When numpy arrays are passed into the Multiprocssing.Q they are pickled and unpickled – N. McA. Jul 29 '16 at 20:22
  • 1
    Not sure if it can apply to your case but I guess you can avoid pickling your arrays by making a copy of them in shared memory, it might be possible with [multiprocessing.sharedctypes](https://docs.python.org/3.5/library/multiprocessing.html#module-multiprocessing.sharedctypes) or with [multiprocessing.Array](https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Array) (if the array you want to share contains more than one dimension you might need to flatten it to use these functions). – mgc Jul 30 '16 at 00:45
  • @mgc Using `multiprocessing.Array` will *kill* the efficiency of the code. It is meant to allow several processes to modify the same shared array, but this means that every single array access takes a big hit in efficiency. I think the best bet is `sharedctypes`. – Bakuriu Jul 31 '16 at 21:22
  • @Bakuriu really? Do you mean because of locking? If so, can't that be disabled, and further, seeing as the current implementation does full serialisation do you think it's actually going to be a hit? https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array – N. McA. Jul 31 '16 at 21:26
  • 2
    Possible duplicate of http://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing – Phillip Aug 01 '16 at 06:08
  • Any use this: http://parad0x.org/git/python/shared-array/about ? If you use `create` to make the shared array, you only need to send it's location down the (picked) queue. – mdurant Aug 01 '16 at 16:18
  • Are we dealing with big arrays? A huge number of small arrays? Is making copies completely out of the question, or could be ok if it can be done faster than pickling? – shx2 Aug 04 '16 at 06:13
  • @shx2 Actually are you sure? Note that the generator passed to the async consumer is actually f(xs), so the next() function called in the Process is next(f(xs())) which is where the work of f is done. – N. McA. Aug 04 '16 at 18:25
  • @shx2 Also we are dealing with a very large number of quite large arrays. Say the ndarrays are (256, 3, 256, 256) and there are ~100e6 of them. – N. McA. Aug 04 '16 at 18:31
  • Relevant: http://stackoverflow.com/q/14416130/513688 – Andrew Aug 04 '16 at 21:34
  • What about using the sharedmem numpy module? – bpachev Aug 06 '16 at 01:42
  • I just added an `ArrayQueue` class to my answer (second section), which is pretty close to what you asked for. This creates a queue similar to a standard multiprocessing.Queue, but it also creates and manages a pool of numpy ndarrays backed by shared memory. Whenever you push a numpy ndarray onto the ArrayQueue, it copies the ndarray to a shared-memory array and puts the id of that array onto the queue. Then when you get a value from the queue, ArrayQueue retrieves the id, copies data from the shared-memory array into a local ndarray, and then returns that. – Matthias Fripp Aug 07 '16 at 22:41

3 Answers3

21

Sharing memory between threads or processes

Use threading instead of multiprocessing

Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)

from threading import Thread
from Queue import Queue
import numpy as np

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Thread(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args

@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs)  # 12.2s

Store numpy arrays in shared memory

Another option, close to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below creates a new ArrayQueue class to do that. The ArrayQueue object should be created before spawning subprocesses. It creates and manages a pool of numpy arrays backed by shared memory. When a result array is pushed onto the queue, ArrayQueue copies the data from that array into an existing shared-memory array, then passes the id of the shared-memory array through the queue. This is much faster than sending the whole array through the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions).

from multiprocessing import Process, Queue, Array
import numpy as np

class ArrayQueue(object):
    def __init__(self, template, maxsize=0):
        if type(template) is not np.ndarray:
            raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.')
        if maxsize == 0:
            # this queue cannot be infinite, because it will be backed by real objects
            raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.')

        # find the size and data type for the arrays
        # note: every ndarray put on the queue must be this size
        self.dtype = template.dtype
        self.shape = template.shape
        self.byte_count = len(template.data)

        # make a pool of numpy arrays, each backed by shared memory, 
        # and create a queue to keep track of which ones are free
        self.array_pool = [None] * maxsize
        self.free_arrays = Queue(maxsize)
        for i in range(maxsize):
            buf = Array('c', self.byte_count, lock=False)
            self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)
            self.free_arrays.put(i)

        self.q = Queue(maxsize)

    def put(self, item, *args, **kwargs):
        if type(item) is np.ndarray:
            if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:
                # get the ID of an available shared-memory array
                id = self.free_arrays.get()
                # copy item to the shared-memory array
                self.array_pool[id][:] = item
                # put the array's id (not the whole array) onto the queue
                new_item = id
            else:
                raise ValueError(
                    'ndarray does not match type or shape of template used to initialize ArrayQueue'
                )
        else:
            # not an ndarray
            # put the original item on the queue (as a tuple, so we know it's not an ID)
            new_item = (item,)
        self.q.put(new_item, *args, **kwargs)

    def get(self, *args, **kwargs):
        item = self.q.get(*args, **kwargs)
        if type(item) is tuple:
            # unpack the original item
            return item[0]
        else:
            # item is the id of a shared-memory array
            # copy the array
            arr = self.array_pool[item].copy()
            # put the shared-memory array back into the pool
            self.free_arrays.put(item)
            return arr

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args


@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs()))))   # 13.5s

Parallel processing of samples instead of functions

The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran %time print sum(1 for x in xs()).

If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):

import multiprocessing
import threading, Queue
import numpy as np

def f(x):
    return x + 1.0

def g(x):
    return x * 3

def h(x):
    return x * x

def final(i):
    return f(g(h(x(i))))

def final_sum(i):
    return f(g(h(x(i)))).sum()

def x(i):
    # produce sample number i
    return np.random.uniform(0, 1, (500, 2000))

def rs_serial(func, n):
    for i in range(n):
        yield func(i)

def rs_parallel_threaded(func, n):
    todo = range(n)
    q = Queue.Queue(2*n_workers)

    def worker():
        while True:
            try:
                # the global interpreter lock ensures only one thread does this at a time
                i = todo.pop()
                q.put(func(i))
            except IndexError:
                # none left to do
                q.put(None)
                break

    threads = []
    for j in range(n_workers):
        t = threading.Thread(target=worker)
        t.daemon=False
        threads.append(t)   # in case it's needed later
        t.start()

    while True:
        x = q.get()
        if x is None:
            break
        else:
            yield x

def rs_parallel_mp(func, n):
    pool = multiprocessing.Pool(n_workers)
    return pool.imap_unordered(func, range(n))

n_workers = 4
n_samples = 1000

print "serial:"  # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:"  # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))

print "mp return arrays:"  # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:"  # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))

The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?

The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.

OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.

import multiprocessing, itertools, math
import numpy as np

def f(xs):
    for x in xs:
        yield x + 1.0

def g(xs):
    for x in xs:
        yield x * 3

def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

def final():
    return f(g(h(xs())))

def final_sum():
    for x in f(g(h(xs()))):
        yield x.sum()

def get_chunk(args):
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation."""
    return list(itertools.islice(args[0](), args[1]))

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""

    pool = multiprocessing.Pool(n_workers)

    # how many chunks will be needed to yield at least max_items items?
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))

    # generate a suitable series of arguments for get_chunk()
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks)

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items)
    chunk_gen = pool.imap_unordered(get_chunk, args_list)

    # parallel_gen flattens the chunks, and yields individual items
    parallel_gen = itertools.chain.from_iterable(chunk_gen)

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items)


# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:"  # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:"  # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))


# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:"  # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:"  # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))
Matthias Fripp
  • 17,670
  • 5
  • 28
  • 45
0

Your example does not seem to run on my computer, although that may have to do with the fact that I'm running windows (issues pickling anything not in __main__ namespace (anything decorated))... would something like this help? (you would have to be put pack and unpack inside each of f(), g(), and h())

Note* I'm not sure this would actually be any faster... Just a stab at what others have suggested..

from multiprocessing import Process, freeze_support
from multiprocessing.sharedctypes import Value, Array
import numpy as np

def package(arr):
    shape = Array('i', arr.shape, lock=False)

    if arr.dtype == float:
        ctype = Value('c', b'd') #d for double #f for single
    if arr.dtype == int:
        ctype = Value('c', b'i') #if statements could be avoided if data is always the same
    data = Array(ctype.value, arr.reshape(-1),lock=False)

    return data, shape

def unpack(data, shape):
    return np.array(data[:]).reshape(shape[:])

#test
def f(args):
    print(unpack(*args))

if __name__ == '__main__':
    freeze_support()

    a = np.array([1,2,3,4,5])
    a_packed = package(a)
    print('array has been packaged')

    p = Process(target=f, args=(a_packed,))
    print('passing to parallel process')
    p.start()

    print('joining to parent process')
    p.join()
    print('finished')
Aaron
  • 10,133
  • 1
  • 24
  • 40
0

Check out the Pathos-multiprocessing project, which avoids the standard multiprocessing reliance on pickling. This should allow you to get around both the inefficiencies of pickling, and give you access to common memory for read-only shared resources. Note that while Pathos is nearing deployment in a full pip package, in the interim I'd recommend installing with pip install git+https://github.com/uqfoundation/pathos

emunsing
  • 9,536
  • 3
  • 23
  • 29