1

The code I'm using is posted below. I'm running on Ubuntu 16.04 and my laptop has a i7 Quad-Core processor. "data" is a matrix that has ~100,000 rows and 4 columns. "eemd" is a computationally expensive function. On my machine, processing all columns take 5 minutes, regardless of whether I do each column in parallel or use Pool.map(), as shown below.

I have seen other examples on this site with blocks of code that I have been able to run and successfully demonstrate Pool.map() shortening the amount of time necessary to run the code by a factor of the number of processes, but that doesn't work for me here and I can't figure out why.

The result is the same whether I use Pool.map() or Pool.imap().

#!/usr/bin/python

import time

from pyeemd import eemd
import numpy as np
import linecache

data = np.loadtxt("test_data.txt")
idx = range(4)

def eemd_sans_multi():
    t = time.time()

    for i in idx:
        eemd(data[:,i])

    print("Without multiprocessing...")
    print time.time()-t

def eemd_wrapper(idx):
    imfs = eemd(data[:,idx])
    return imfs

def eemd_with_multi():
    import multiprocessing as mp

    pool = mp.Pool(processes=4)

    t = time.time()

    for x in pool.map(eemd_wrapper, idx):
        print(x)

    print("With multiprocessing...")
    print time.time()-t


if __name__ == "__main__":
    eemd_sans_multi()
    eemd_with_multi()

New Code Based on Dunes' Reply

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ctypes
from time import time

from pyeemd import eemd
import numpy as np
import re
import linecache

data = np.loadtxt("test_data.txt",skiprows=8)
headers = re.split(r'\t+',linecache.getline("test_data.txt", 8))

idx = [i for i, x in enumerate(headers) if x.endswith("Z")]
idx = idx[0:2]
print(idx)

def eemd_wrapper(idx):
    imfs = eemd(data[:,idx])
    return imfs

def main():
    print("serial")
    start = time()
    for i in idx:
        eemd_wrapper(i)
    end = time()
    print("took {} seconds\n".format(end-start))

    for executor_class in (ThreadPoolExecutor, ProcessPoolExecutor):
        print(executor_class.__name__)
        start = time()
        # we'll only be using two workers so as to make time comparisons simple
        with executor_class(max_workers=2) as executor:
            executor.map(eemd_wrapper, idx)
        end = time()
        print("took {} seconds\n".format(end-start))

if __name__ == '__main__':
    main()
Alfred
  • 11
  • 4
  • How long does it take with and wothout multiprocessing? Reading around and based on my experience, multiprocessing is faster than serial processing only if the task you are splitting in multiple processes are long, otherwise overhead "eats" the advantage. [Here](https://stackoverflow.com/questions/8775475/python-using-multiprocess-is-slower-than-not-using-it) a related question – gionni Aug 01 '17 at 07:15
  • Both with and without multiprocessing, it takes 5 minutes. That is my primary problem here - I want to use multiprocessing to speed things up, but the code above is not working. Running each column in order takes just as long as my attempt at running them in parallel. – Alfred Aug 01 '17 at 07:20
  • @Alfred you can see this post https://stackoverflow.com/a/11196615 – Pritam Pan Aug 01 '17 at 07:24
  • @Alfred : not sure this would make any difference, but have you tried passing the data you are working on to your processes, e.g. make `def eemd_wrapper(data, idx)` (or just the already sliced data). It's kind of a blind guess since my knowledge of multiprocessing and the forking underneath it is quite shallow – gionni Aug 01 '17 at 08:35
  • @gionni : I've tried that, it's not easy to do. Pool.map() can only take one input, so I tried making a partial with the data already passed to the wrapper, but that led to more issues I couldn't solve and so I did this. – Alfred Aug 01 '17 at 12:36
  • @Pritam Pan : Thanks, I will try to implement that, but I would also very much like to understand why this code in particular doesn't work. – Alfred Aug 01 '17 at 12:37
  • @Alfred : you can use `Pool.starmap()` – gionni Aug 01 '17 at 12:41
  • @gionni : Do you mean I should use Pool.starmap() to use multiple inputs? Do you think it would vastly improve the multiprocessing speed of my code above just by making that change? – Alfred Aug 01 '17 at 12:50
  • @Alfred : I really don't know, but since it is very fast and easy to implement you might want to try it, unless you have time, then it would certainly be better to dig into multiprocessing (which I never had time to do) – gionni Aug 01 '17 at 12:53

2 Answers2

1

In python 3, you can try ProcessPoolExecutor of concurrent.futures module, here is an example:

from time import time
from concurrent.futures import ProcessPoolExecutor


def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i


numbers = [(1963309, 2265973), (2030677, 3814172),
           (1551645, 2229620), (2039045, 2020802), (6532541, 9865412)]
start = time()
results = list(map(gcd, numbers))
end = time()
print('1st Took %.3f seconds' % (end - start))
start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('2nd Took %.3f seconds' % (end - start))
rsu8
  • 491
  • 4
  • 10
  • Hey, thanks for the reply. I used the concurrent.futures idea and edited new code into the original post, but I'm not seeing large decreases in the time it took to run - only about 13%. Any ideas why that might be? – Alfred Aug 01 '17 at 17:42
0

Major Edit

It looks like libeemd is already multi-threaded. You will not gain significant performance increases from parallel execution in Python. You've stated you're using Ubuntu 16.04, which means you will have compiled libeemd with gcc 5.4 (which supports OpenMP). The Makefile of libeemd shows it's compiled with -fopenmp. So yes, it's already multi-threaded.

That the library is already multi-threaded also explains why ProcessPoolExecutor runs into problems in the example code. That is, the library has already been used before the process pool is invoked and the default way Unix systems creates new processes (forking) is to create a pseudo-copy of the process. So the child workers are left with a library that references to threads in the parent process. If you only do the ProcessPoolExecutor on its own you will see it works fine.

Original Answer

Given that pyeemd is a wrapper for libeemd using ctypes as glue, you shouldn't need to use multi-processing -- a multi-threading solution should be enough to get a speed boost (and the best speed boost at that).

Why threads?

Multi-processing is often used in place of multi-threading in Python when the task is CPU bound. This is because of the Global Interpreter Lock (GIL), which is essential for performance in single-threaded Python. However, the GIL makes multi-threaded pure Python code run as if it were single threaded.

However, when a thread enters a C function through the ctypes module it releases the GIL as the function will not need to execute Python code. Python types are converted into C types for the call and numpy arrays are wrappers around C buffers (which are guaranteed to exist for the duration of the function). So the Python interpreter and its GIL just aren't needed.

Multi-processing is a good way to get a speed boost if using pure Python, but one of its pitfalls is the need to send data to the child workers and return the result to the parent. If either of these take up significant amounts of memory then this adds a large overhead of pushing the data backwards and forwards. So, why use multi-processing if you don't need to.

Example

Here we're going test how long it takes to complete a long running C function 4 times. This will be done once in serial, once using two worker threads and once using two worker processes. This will show that multi-threading is just as good (if not better) than multi-processing when the bulk of the work is done in a C library. lengthy.c is just an example, any deterministic, but expensive, function called with identical arguments will do.

lengthy.c

#include <stdint.h>

double lengthy(uint64_t n) {
    double total = 0;
    for (uint64_t i = 0; i < n; ++i) {
        total += i;
    }
    return total;
}

Turn the code into a library that can be loaded by ctypes

dunes@dunes-VM:~/src$ gcc -c -Wall -Werror -fpic lengthy.c
dunes@dunes-VM:~/src$ gcc -shared -Wl,-soname,liblengthy.so -o liblengthy.so lengthy.o -lc

time_lengthy.py

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ctypes
from time import time

# create a handle to the C function lengthy
liblengthy = ctypes.cdll.LoadLibrary('./liblengthy.so')
lengthy = liblengthy.lengthy
lengthy.argtypes = ctypes.c_uint64,
lengthy.restype = ctypes.c_double

def job(arg):
    """This function is only necessary as lengthy itself cannot be pickled, and
    therefore cannot be directly used with a ProcessPoolExecutor.
    """
    return lengthy(arg)

def main():
    n = 1 << 28
    # i << 28 was chosen because it takes approximately 1 second on my machine
    # Feel free to choose any value where 0 <= n < (1 << 64)
    items = [n] * 4  # 4 jobs to do
    print("serial")
    start = time()
    for i in items:
        job(i)
    end = time()
    print("took {} seconds\n".format(end-start))

    for executor_class in (ThreadPoolExecutor, ProcessPoolExecutor):
        print(executor_class.__name__)
        start = time()
        # we'll only be using two workers so as to make time comparisons simple
        with executor_class(max_workers=2) as executor:
            executor.map(job, items)
        end = time()
        print("took {} seconds\n".format(end-start))

if __name__ == '__main__':
    main()

Which, when run gives:

dunes@dunes-VM:~/src$ python3 multi.py 
serial
took 4.936346530914307 seconds

ThreadPoolExecutor
took 2.59773850440979 seconds

ProcessPoolExecutor
took 2.611887216567993 seconds

We can see the the two threads/processes running in parallel were nearly twice as fast as the single thread running serially. However, the threads won't suffer the overhead of sending data back and forth between parent and child workers. So, you might as well use threads as the pyeemd source shows it doesn't do any significant work in pure Python.

Community
  • 1
  • 1
Dunes
  • 37,291
  • 7
  • 81
  • 97
  • Hey, thanks so much for your reply. I modified your code for use with my data and needs, and posted my new code in the original post above. However, these are the results that I got: serial took 428.560495138 seconds, ThreadPoolExecutor took 373.581459045 seconds, ProcessPoolExecutor. Why do I not see the 2x decrease in time? – Alfred Aug 01 '17 at 17:34
  • ProcessPoolExecutor didn't even finish, I canceled it because it was already taking way longer than the other two. – Alfred Aug 01 '17 at 17:54
  • That you get a slight boost with threads is great, but that's the best you're going to get. See the edit for details. – Dunes Aug 02 '17 at 12:20
  • I was actually able to get both the multiprocessing and multithreading examples working. The multithreading example was 69% faster than running in series, while multiprocessing was ~65% faster, which corresponding to the time_lengthy.py example you posted. I'll post updated code. – Alfred Aug 07 '17 at 03:18
  • Also, the reason ProcessPoolExecutor was having problems is because I was running it right after ThreadPoolExecutor. I was running my serial, thread, and process functions sequentially for testing, but for some reason, doing this causes the ProcessPoolExecutor to never finish. I had to end all of the Python processes created by multithreading, then run the multiprocessing example to make it work. – Alfred Aug 07 '17 at 03:28