3

I am experimenting the use of concurrent.futures.ProcessPoolExecutor to parallelize a serial task. The serial task involves finding the number of occurrence of a given number from a number range. My code is shown below.
During its execution, I noticed from Task Manager / System Monitor / top that only one cpu/thread is constantly in operation despite giving the max_workers of processPoolExecutor a value more than 1. Why is this the case? How can I parallelize my code using concurrent.futures? My code was executed with python 3.5.

import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))
Sun Bear
  • 7,594
  • 11
  • 56
  • 102

2 Answers2

1

Running your code shows that all three workers are there but two of them are sleeping. The problem is, that executor.submit(_findmatch, nmax, number) only tells one worker to execute the function _findmatch.

I don't understand what your code is doing but basically you need to either

  • split up the task in three even parts and send each part to a process using executor.submit
  • split the task up in smaller chunks (let's say a chunk consisting all of 100 elements) and use map so every _findmatch gets only the chunk it is assigned to.
hansaplast
  • 11,007
  • 2
  • 61
  • 75
1

The problem with your code is that it submits only one task which will then be executed by one of the workers while rest of them are doing nothing. You need to submit multiple tasks that can be executed by the workers in parallel.

Below example splits the search area to three different tasks of which each is executed by different worker. Futures returned by submit are added to a list and once all of them are submitted wait is used to wait them all to complete. If you call result immediately after submitting a task it will block until the future is completes.

Note that instead of generating a list of numbers the code below just counts the numbers which have digit 5 in them in order to decrease the memory usage:

import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

Output:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec
niemmi
  • 17,113
  • 7
  • 35
  • 42
  • Thanks. While I am digesting your advice, I have a question. Why is there a need to manually create chunks? Isn't `concurrent.futures.ProcessPoolExecutor` suppose to split the work of solving the given function among it's pool of workers automatically? – Sun Bear Feb 05 '17 at 09:56
  • @SunBear: It's your job as a programmer to split the work to chunks that can be run independently by the workers. `ProcessPoolExecutor` takes care that call the chunks given to it are run by the workers. Note that instead of splitting the task to three chunks in the example I could have split it to 10 different tasks and the end result would have been the same (of course the console output would be different since `_findmatch` would run 10 times). – niemmi Feb 05 '17 at 14:09
  • Thanks for your pointers. I have rewritten the code to output a list with the occurring numbers. I will post it in my next question where I compared it performance with `executor.map()`. – Sun Bear Feb 06 '17 at 13:36
  • I have benchmarked `.submit()` and `.map()` against a serial code http://stackoverflow.com/q/42074501/5722359. Pls comment if you have time. – Sun Bear Feb 06 '17 at 18:21