10

I wanted to try different ways of using multiprocessing starting with this example:

$ cat multi_bad.py 
import multiprocessing as mp
from time import sleep
from random import randint

def f(l, t):
#   sleep(30)
    return sum(x < t for x in l)

if __name__ == '__main__':
    l = [randint(1, 1000) for _ in range(25000)]
    t = [randint(1, 1000) for _ in range(4)]
#   sleep(15)
    pool = mp.Pool(processes=4)
    result = pool.starmap_async(f, [(l, x) for x in t])
    print(result.get())

Here, l is a list that gets copied 4 times when 4 processes are spawned. To avoid that, the documentation page offers using queues, shared arrays or proxy objects created using multiprocessing.Manager. For the last one, I changed the definition of l:

$ diff multi_bad.py multi_good.py 
10c10,11
<     l = [randint(1, 1000) for _ in range(25000)]
---
>     man = mp.Manager()
>     l = man.list([randint(1, 1000) for _ in range(25000)])

The results still look correct, but the execution time has increased so dramatically that I think I'm doing something wrong:

$ time python multi_bad.py 
[17867, 11103, 2021, 17918]

real    0m0.247s
user    0m0.183s
sys 0m0.010s

$ time python multi_good.py 
[3609, 20277, 7799, 24262]

real    0m15.108s
user    0m28.092s
sys 0m6.320s

The docs do say that this way is slower than shared arrays, but this just feels wrong. I'm also not sure how I can profile this to get more information on what's going on. Am I missing something?

P.S. With shared arrays I get times below 0.25s.

P.P.S. This is on Linux and Python 3.3.

Lev Levitsky
  • 63,701
  • 20
  • 147
  • 175
  • In your real use case, are you modifying the gigabytes of data? or do the processes simply need to access the data? – unutbu Oct 29 '12 at 13:38
  • @unutbu Read-only would be fine. – Lev Levitsky Oct 29 '12 at 13:50
  • 2
    Then you do not need to pass the data as an argument to the worker function. Simply define the data *once* at the global level of the `__main__` module and all subprocesses will have access to it. You won't need a mp.Manager or mp.Array at all for this. – unutbu Oct 29 '12 at 13:52
  • @unutbu That's a very good point, thanks! Somehow I found that I could do so when playing with shared arrays (because passing them as arguments produced errors), but, to my shame, I failed to generalize that fact to other cases. But let's pretend that I do need write access, because I want it to be clear for me, too. – Lev Levitsky Oct 29 '12 at 13:57
  • @unutbu Actually, I just tried that (changing `def f(l, t)` to `def f(t)` and adjusting the asynchronous call) and it looks like each process stores the data, it's not shared. So the total memory usage is a multiple of that observed with a proxy object or shared array. Any thoughts? – Lev Levitsky Oct 29 '12 at 14:06
  • @unutbu But when I run the code shown above without any changes, child processes eat up twice as much memory. – Lev Levitsky Oct 29 '12 at 14:19
  • It seems like it's expected, though ([link](http://stackoverflow.com/a/659888/1258041)). I guess you meant that it would _work_ that way, but my primary concern is memory consumption. – Lev Levitsky Oct 29 '12 at 14:40
  • At least on Linux, I believe spawning subprocesses will not automatically consume much extra memory -- not a whole copy of `data` for each subprocess -- because Linux uses copy-on-write. I've posted some code to demonstrate/test this. – unutbu Oct 29 '12 at 20:14

2 Answers2

10

Linux uses copy-on-write when subprocesses are os.forked. To demonstrate:

import multiprocessing as mp
import numpy as np
import logging
import os

logger = mp.log_to_stderr(logging.WARNING)

def free_memory():
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u = unit))
                total += amount
    return total

def worker(i):
    x = data[i,:].sum()    # Exercise access to data
    logger.warn('Free memory: {m}'.format(m = free_memory()))

def main():
    procs = [mp.Process(target = worker, args = (i, )) for i in range(4)]
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()

logger.warn('Initial free: {m}'.format(m = free_memory()))
N = 15000
data = np.ones((N,N))
logger.warn('After allocating data: {m}'.format(m = free_memory()))

if __name__ == '__main__':
    main()

which yielded

[WARNING/MainProcess] Initial free: 2522340
[WARNING/MainProcess] After allocating data: 763248
[WARNING/Process-1] Free memory: 760852
[WARNING/Process-2] Free memory: 757652
[WARNING/Process-3] Free memory: 757264
[WARNING/Process-4] Free memory: 756760

This shows that initially there was roughly 2.5GB of free memory. After allocating a 15000x15000 array of float64s, there was 763248 KB free. This roughly makes sense since 15000**2*8 bytes = 1.8GB and the drop in memory, 2.5GB - 0.763248GB is also roughly 1.8GB.

Now after each process is spawned, the free memory is again reported to be ~750MB. There is no significant decrease in free memory, so I conclude the system must be using copy-on-write.

Conclusion: If you do not need to modify the data, defining it at the global level of the __main__ module is a convenient and (at least on Linux) memory-friendly way to share it among subprocesses.

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • 1
    It works the same way on my machine, and I even tried changing the direct use of `Process` to `Pool.apply_async`, and the results are still similar. I'm not sure what's different from my initial code. I also tried moving memory allocation inside the `if` block. – Lev Levitsky Oct 29 '12 at 21:26
  • OK, there is no difference :) Somehow `top` reports all processes to be using a lot of memory, and it's not shown as shared. – Lev Levitsky Oct 30 '12 at 11:36
6

This is to be expected because accessing a shared objects means having to pickle the request send it through some kind of signal/syscall unpickle the request perform it and return the result in the same way.

Basically you should try to avoid sharing memory as much as you can. This leads to more debuggable code(because you have much less concurrency) and the speed up is greater.

Shared memory should only be used if really needed(e.g. sharing gigabytes of data so that copying it would require too much RAM or if the processes should be able to interact through this shared memory).

On a side note, probably using the Manager is much slower than a shared Array because the Manager must be able to handle any PyObject * and thus has to pickle/unpickle etc, while the arrays can avoid much of this overhead.

From the multiprocessing's documentation:

Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

So using a Manager means to spawn a new process that is used just to handle the shared memory, that's probably why it takes much more time.

If you try to profile the speed of the proxy it its a lot slower than a non-shared list:

>>> import timeit
>>> import multiprocessing as mp
>>> man = mp.Manager()
>>> L = man.list(range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
50.490395069122314
>>> L = list(range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
0.03588080406188965
>>> 50.490395069122314 / _
1407.1701119638526

While an Array is not so much slower:

>>> L = mp.Array('i', range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
0.6133401393890381
>>> 0.6133401393890381 / 0.03588080406188965
17.09382371507359

Since the very elementary operation are slow and don't think there's much hope to speed them up, this means that if you have to share a big list of data and want fast access to it then you ought to use an Array.

Something that might speed things up a bit is accessing more than one element at a time(e.g. getting slices instead of single elements), but depending on what you want to do this may or may not be possible.

Bakuriu
  • 98,325
  • 22
  • 197
  • 231
  • The use of a proxy object slows things down by a factor of 100, making the method useless. I was wondering if it's only so for the shown case or not. Maybe the right use case for proxy objects is a little different? And yes, I'm only looking into this because I need to share gigabytes of data between processes. – Lev Levitsky Oct 29 '12 at 12:47
  • If the shared Array is much faster than `Manager.list` why can't you use it? Do you really need the flexibility of a `list`? – Bakuriu Oct 29 '12 at 12:49
  • I want to know how much effort I have to put into refactoring an existing application. Changing to use a list proxy is straightforward. Changing to shared arrays is probably possible, but more complicated. I want to make sure I understand what's going on before doing this. – Lev Levitsky Oct 29 '12 at 12:56
  • I've added a bit more explanation. How much effort you'll have to put in refactoring depends on the code that uses this list. If it uses many `list` features that are not present in the `Array` then you'll have to change much code, but if you already had a homogenous sequence then you probably wont need to change much code. – Bakuriu Oct 29 '12 at 13:08