1

I want to use Python's multiprocessing module for the following: Map an input line to a list of integers and calculate the sum of this list.

The input line is initially a string where the items to be summed are separated by spaces.

What I have tried is this:

from itertools import imap

my_input = '1000000000 ' * int(1e6)
print sum(imap(int, my_input.split()))

This takes about 600ms on my machine, but I would like to make it faster with multiprocessing.

It seems that the bottleneck is in the mapping-part since the sum-method is pretty fast when applied to a ready list of integers:

>>> int_list = [int(1e9)] * int(1e6)
>>> %time sum(int_list)
CPU times: user 7.38 ms, sys: 5 µs, total: 7.38 ms
Wall time: 7.4 ms
>>> 1000000000000000

I tried to apply the instructions from this question but as I'm quite new to using multiprocessing, I couldn't fit the instructions to this problem.

Community
  • 1
  • 1
  • What OS are you using? One of the bigger problems is that splitting the string will be time consuming, as will sending the split substrings to the process pool. On Unix, you can avoid directly sending the substrings to the child process, and even delegate the splitting to the child process. – Dunes Aug 31 '16 at 16:39
  • @Dunes I'm using OS X. How can I avoid sending the substrings and delegate the splitting? –  Aug 31 '16 at 17:01
  • `multiprocessing` won't help here due to serialisation overhead. You should only use `multiprocessing` to improve performance when one call of your target function is very expensive, far more expensive than a call to `pickle.dump` given your machines IO-performance. – Eli Korvigo Aug 31 '16 at 20:04

2 Answers2

0

So, this seems to roughly boil down to three steps:

  1. Make a pool
  2. Map int() across the list within that pool
  3. Sum the results.

So:

if __name__ == '__main__':
    import multiprocessing
    my_input = '1000000000 ' * int(1e6)
    string_list = my_input.split()
    # Pool over all CPUs
    int_list = multiprocessing.Pool().map(int, string_list)
    print sum(int_list)

It may be more efficient for time to use generators where possible:

if __name__ == '__main__':
    import multiprocessing
    import re
    my_input = '1000000000 ' * int(1e6)
    # use a regex iterator matching whitespace
    string_list = (x.group(0) for x in re.finditer(r'[^\s]+\s', my_input))
    # Pool over all CPUs
    int_list = multiprocessing.Pool().imap(int, string_list)
    print sum(int_list)

The regex will likely be slower than split, but using re.finditer should allow the Pool to start mapping as fast as individual splits are made, and using imap rather than map should do similarly for sum (allowing it to start adding numbers as they become available). Credit to this answer for the re.finditer idea.

It may or may not be more efficient to multiprocess than doing it in a single process. You might end up losing more time making new processes and passing the results back from them than you gain in doing things all at once. The same goes for if you were to try putting the adding into the pool as well.

On the system I'm testing this on, which has two CPUs, I get the one-process solution to run in about half a second, the non-generator multiprocess solution in about 1 second, and the generator solution in 12-13 seconds.

Community
  • 1
  • 1
Vivian
  • 1,539
  • 14
  • 38
  • Thank you! Apart from missing quotas around __main__ this worked, but as you implied, this was actually much slower (execution takes now about 3 seconds). –  Aug 31 '16 at 15:09
  • @starkbot Fixed quotes. Also, was working on other additions when you posted that - try the iterator version? Might vary by the number of CPUs available. – Vivian Aug 31 '16 at 15:19
  • I think it's not very surprising that the `multiprocessing`-based version is a lot slower given that the only thing you can do concurrently here is a single `int` call. :-] – Frerich Raabe Aug 31 '16 at 15:20
  • @FrerichRaabe Well, yes. This makes a good basic example of how to use multiprocessing and pools, though, even if the specific job isn't a good one for it. – Vivian Aug 31 '16 at 15:22
  • @DavidHeyman the second version gives this error: ValueError: invalid literal for int() with base 10: '' –  Aug 31 '16 at 15:29
  • @FrerichRaabe isn't it possible to divide the sum calculation to several threads? –  Aug 31 '16 at 15:33
  • @starkbot Oops - yeah, was accidentally matching the whitespace instead of the non-whitespace. Correcting now. – Vivian Aug 31 '16 at 15:38
0

Using a feature of Unix systems called forking, you can read (not write) data from the parent process with zero overhead. Normally, you would have to copy the data over, but forking a process in Unix allows you to circumvent this.

Using this, the job in the pool can access the whole input string and extract the part that it will work on. It can then split and parse this section of the string on its own and return the sum of the integers in its section.

from multiprocessing import Pool, cpu_count
from time import time


def serial(data):
    return sum(map(int, data.split()))


def parallel(data):
    processes = cpu_count()

    with Pool(processes) as pool:
        args = zip(
            ["input_"] * processes, # name of global to access
            range(processes), # job number
            [processes] * processes # total number of jobs 
        )
        return sum(pool.map(job, args, chunksize=1))


def job(args):
    global_name, job_number, total_jobs = args
    data = globals()[global_name]
    chunk = get_chunk(data, job_number, total_jobs)

    return serial(chunk)


def get_chunk(string, job_number, total_jobs):
    """This function may mess up if the number of integers in each chunk is low (1-2).
    It also assumes there is only 1 space separating integers."""
    approx_chunk_size = len(string) // total_jobs

    # initial estimates
    start = approx_chunk_size * job_number
    end = start + approx_chunk_size

    if start and not string.startswith(" ", start - 1):
        # if string[start] is not beginning of a number, advance to start of next number
        start = string.index(" ", start) + 1

    if job_number == total_jobs:
        # last job
        end = None
    elif not string.startswith(" ", end - 1):
        # if string[end] is part of a number, then advance to end of number
        end = string.index(" ", end - 1)

    return string[start:end]


def timeit(func, *args, **kwargs):
    "Simple timing function"
    start = time()
    result = func(*args, **kwargs)
    end = time()
    print("{} took {} seconds".format(func.__name__, end - start))
    return result


if __name__ == "__main__":
#    from multiprocessing.dummy import Pool # uncomment this for testing

    input_ = "1000000000 " * int(1e6)

    actual = timeit(parallel, input_)
    expected = timeit(serial, input_)
    assert actual == expected
Dunes
  • 37,291
  • 7
  • 81
  • 97