5

I am trying python multiprocessing using Python 3.7.4 on Windows 10, Intel Core i7-8550U processor.
I am testing the multiprocessing with two functions, one with a basic sleep() and another with using the matthews_corrcoef from sklearn. The multiprocessing works with the sleep function, but not with the sklearn function.

import numpy as np
from sklearn.metrics import matthews_corrcoef
import time
import concurrent.futures
from multiprocessing import Process, Pool
from functools import partial
import warnings
import sys

class Runner():
  def sleeper(self, pred, man, thr = None):
    return time.sleep(2)

  def mcc_score(self, pred, man, thr = None):
    warnings.filterwarnings("ignore")
    return matthews_corrcoef(pred, man)

  def pool(self, func):
    t1 = time.perf_counter()
    p = Pool()
    meth = partial(func, pred, man)
    res = p.map(meth, thres)
    p.close()

    t2 = time.perf_counter()
    print(f'Pool {func.__name__} {round((t2-t1), 3)} seconds')

  def vanilla(self, func):
    t1 = time.perf_counter()
    for t in thres:
      func(pred, man)
    t2 = time.perf_counter()
    print(f'vanilla {func.__name__} {round((t2-t1), 3)} seconds')

if __name__== "__main__":
    print(sys.version)
    r = Runner()
    thres = np.arange(0,1, 0.3)
    print(f"Number of thresholds {len(thres)}")
    pred = [1]*200000
    man = [1]*200000
    results = []

    r.pool(r.mcc_score)
    r.vanilla(r.mcc_score)

    r.pool(r.sleeper)
    r.vanilla(r.sleeper)

In windows, for the mcc_score function, using pool is actually slower than the vanilla version, whereas in Linux it works properly.

Here are the sample outputs

#windows
3.7.4 (default, Aug  9 2019, 18:34:13) [MSC v.1915 64 bit (AMD64)]
Number of thresholds 4
Pool mcc_score 3.247 seconds
vanilla mcc_score 1.591 seconds
Pool sleeper 5.828 seconds
vanilla sleeper 8.001 seconds

#linux
3.7.0 (default, Jun 28 2018, 13:15:42) [GCC 7.2.0]
Number of thresholds 34
Pool mcc_score 1.946 seconds
vanilla mcc_score 8.817 seconds

I went through the documentation and other relevant questions in stackoverflow, where it mainly states using if __name__== "__main__":. Some help would be greatly appreciated as I have been stuck on this for quite some time now. If I missed any important information, please mention it, I will provide it.

Karl Knechtel
  • 62,466
  • 11
  • 102
  • 153
adib.mosharrof
  • 1,624
  • 1
  • 12
  • 16
  • Is it possible numpy is already doing multi threading under the hood? Try disabling it per this question maybe: https://stackoverflow.com/questions/17053671/python-how-do-you-stop-numpy-from-multithreading – Pace Nov 09 '19 at 21:19

1 Answers1

8

First, I am going to simplify your code. As the methods in your class never make use of class variables, I will skip the class approach and use just methods.

Starting point is the example from the documentation for multiprocessing. To see the benefit from using Pool, I add two seconds of sleep and print a timestamp.

import datetime
from multiprocessing import Pool
import time

def fx(x):
    time.sleep(2)
    print(datetime.datetime.utcnow())
    return x*x

if __name__ == '__main__':
    with Pool() as p:
        print(p.map(fx, range(10)))

The output is as expected

2019-11-10 11:10:05.346985
2019-11-10 11:10:05.363975
2019-11-10 11:10:05.418941
2019-11-10 11:10:05.435931
2019-11-10 11:10:07.347753
2019-11-10 11:10:07.364741
2019-11-10 11:10:07.419707
2019-11-10 11:10:07.436697
2019-11-10 11:10:09.348518
2019-11-10 11:10:09.365508
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

As I did not specify the number of cores, all available cores are used (at my machine 4). This can be seen in the time stamps: 4 timestamps are close to each other. Than the exceution pauses until the cores are freed again.

You want to use a method matthews_corrcoef that takes according the documentation two arguments y_true and y_pred.

Before using that method, let's modify the test method from above to take two arguments:

def fxy(x, y):
    time.sleep(2)
    print(datetime.datetime.utcnow())
    return x*y

From the documentation for multiprocessing.pool.Pool we learn, that map takes only one argument. So I am going to use apply_async instead. As apply_async returns result objects instead the return values of the method, I am using a list to store the results and get the return values in a separate loop like this:

if __name__ == '__main__':
    with Pool() as p:
        res = []
        for i in range(10):
            res.append(p.apply_async(fxy, args = (i, i)))
        for item in res:
            print(item.get())

This gives a similar output to the first approach:

2019-11-10 11:41:24.987093
0
2019-11-10 11:41:24.996087
1
2019-11-10 11:41:25.008079
2019-11-10 11:41:25.002083
4
9
2019-11-10 11:41:26.988859
16
2019-11-10 11:41:27.009847
2019-11-10 11:41:27.009847
25
36
2019-11-10 11:41:27.011845
49
2019-11-10 11:41:28.989623
64
2019-11-10 11:41:29.019606
81

Now for matthews_corrcoef. For better verification of the results (your pred and man throw errors when applied to matthews_corrcoef), I am using the nomenclature and values like in the example in the documentation of matthews_corrcoef.

import datetime
from multiprocessing import Pool
import numpy as np
from sklearn.metrics import matthews_corrcoef

def mcc_score(y_true, y_pred): 
    print(datetime.datetime.utcnow())
    return matthews_corrcoef(y_true, y_pred)

y_true = [+1, +1, +1, -1]
y_pred = [+1, -1, +1, +1]

if __name__ == '__main__':
    with Pool() as p:
        res = []
        for i in range(10):
            res.append(p.apply_async(mcc_score, args = (y_true, y_pred)))
        for item in res:
            print(item.get())

Results like expected:

2019-11-10 11:49:07.309389
2019-11-10 11:49:07.345366
2019-11-10 11:49:07.375348
2019-11-10 11:49:07.393336
2019-11-10 11:49:07.412325
2019-11-10 11:49:07.412325
2019-11-10 11:49:07.412325
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
2019-11-10 11:49:07.420319
2019-11-10 11:49:07.420319
2019-11-10 11:49:07.413325
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
mdk
  • 398
  • 2
  • 8
  • thank you very much for the response... i tried your suggestions but in the end I ended up writing my own version of calculating mcc for different thresholds.... in sklearn they are always calculating the confusion matrix, so its really slow. I added dynamic programming to solve this repetitive task, thus making it a lot faster. – adib.mosharrof Nov 21 '19 at 18:09