1

I'm trying to implement a task in parallel using Concurrent. Please find below a piece of code for it:

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures

# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)

# max_Worker = cpu_num
max_Worker = 1

# A fake input array
n=1000000
array = list(range(n))
results = []

# A fake function being applied to each element of array 
def task(i):
  return i**2 

x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
  features = {executor.submit(task, j) for j in array}

  # the real function is heavy and we need to be sure of completeness of each run
  for future in concurrent.futures.as_completed(features):
    results.append(future.result())
      
results = [future.result() for future in features]
y = time.time()

print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')

And now my questions,

  1. Although there is no error, is it correct/optimized?
  2. While playing with the number of workers, seems there is no improvement in the speed (e.g., 1 vs 16, no difference). Then, what's the problem and how can be solved?

Thanks in advance,

ir0098
  • 127
  • 1
  • 13
  • Are you sure threading is the right approach? Looks to me like multiprocessing (ProcessPoolExecutor) would be the better fit (here). – Timus Jul 02 '21 at 13:54
  • This is a result of CPython's GIL (Global Interpreter Lock). ( https://stackoverflow.com/a/6821529/2985796 ). There are a few options. First look into the new-ish `multiprocessing` library. If that doesn't cut it in terms of the speed you desire you may have to move to a non-standard Python implementation or more than likely a generally faster programming language. – KDecker Jul 02 '21 at 13:54
  • You have 3 issues: 1. `max_Worker` is 1, so by definition there will be no concurrency. 2. Your worker function, `task`, is pure Python byte-code with no I/O to release Global Interpreter Lock so you cannot *expect* to release the GIL (i.e. this is a poor candidate for multithreading). 3. This is also a poor candidate for multiprocessing because *too little work* is being done by worker function `task` to offset the overhead of passing arguments and results to and from one process to another. I would not expect much if any improvement. – Booboo Jul 02 '21 at 14:17

1 Answers1

2

See my comment to your question. To the overhead I mentioned in that comment you need to also add the oberhead in just creating the process pool itself.

The following is a benchmark with several results. The first is a timing from just calling the worker function task 100000 times and creating a results list and printing out the last element of that list. It will become apparent why I have reduced the number of times I am calling task from 1000000 to 100000.

The next attempt is to use multiprocessing to accomplish the same thing using a ProcessPoolExecutor with the submit method and then processing the Future instances that are returned.

The next attempt is to instead use the map method with the default chunksize argument of 1 being used. It is important to understand this argument. With a chunksize value of 1, each element of the iterable that is passed to the map method is written individually to a queue of tasks as a chunk to be processed by the processes in the pool. When a pool process becomes idle looking for work, it pulls from the queue the next chunk of tasks to be performed, processes each task comprising the chunk and then becomes idle again. When there are a lot of submitted tasks being submitted via map, a chunksize value of 1 is inefficient. You would expect its performance to be equivalent to repeatedly issuing submit calls for each element of the iterable.

The next attempt specifies a chunksize value which approximates more or less the value that the map function used by the Pool class in the multiprocessing package would have used by default. As you can see, the improvement is dramatic, but still not an improvement over the non-multiprocessing case.

The final attempt uses the multiprocessing faciltity provided by package multiprocessing and its multiprocessing.pool.Pool class. The difference in this benchmark is that its map function uses a more intelligent default chunksize when no chunksize argument is specified.

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
  return i**2

# required for Windows:
if __name__ == '__main__':
    n=100000

    t1 = time.time()
    results = [task(i) for i in range(n)]
    print('Non-multiprocessing time:', time.time() - t1, results[-1])

    # num CPUs
    cpu_num = os.cpu_count()
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
    print('Multiprocessing time using submit:', time.time() - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
    print('Multiprocessing time using map:', time.time() - t1, results[-1])

    t1 = time.time()
    chunksize = n // (4 * cpu_num)
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
    print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
    print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])

Prints:

Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available:  8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001

Update

The following bechmarks use a version of task that is very CPU-intensive and shows the benefit of multiprocessing. It would also seem for this small iterable size (100), forcing a chunksize value of 1 for the Pool.map case (it would by default compute a chunksize value of 4), is slightly more performant.

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
    for _ in range(1_000_000):
        result = i ** 2
    return result

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, pool_size * 4)
    if remainder:
        chunksize += 1
    return chunksize

# required for Windows:
if __name__ == '__main__':
    n = 100
    cpu_num = os.cpu_count()
    chunksize = compute_chunksize(n, cpu_num)

    t1 = time.time()
    results = [task(i) for i in range(n)]
    t2 = time.time()
    print('Non-multiprocessing time:', t2 - t1, results[-1])

    # num CPUs
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
        t2 = time.time()
    print('Multiprocessing time using submit:', t2 - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
        t2 = time.time()
    print('Multiprocessing time using map:', t2 - t1, results[-1])

    t1 = time.time()

    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
        t2 = time.time()
    print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
        t2 = time.time()
    print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n), chunksize=1)
        t2 = time.time()
    print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])

Prints:

Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available:  8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • thank you very much for your detailed response and your time, indeed it is much more than clear now and definitely would be helpful for others as well. Just another question, I tried to give a trial with a virtual machine having 32 VIRTUAL CPUs on the google cloud; 32 vCPUs are slower than my laptop 8 CPUs. How this can be addressed? e.g., in the case of pool map: 8 CPUs: Pool.map: {0.18767499923706055} 32 vCPUs: Pool.map: {0.23704195022583008} 9999800001 – ir0098 Jul 02 '21 at 16:06
  • I've updated the answer with new benchmarks with the updated `task` function, which show the benefits of multiprocessing. I have also changed the timing methodology to not count any time it takes for terminating the process pool (which is relatively negligible). – Booboo Jul 02 '21 at 17:11
  • I've updated the answer with `compute_chunksize`, which is *essentially* the algorithm used by the `Pool.map` method when no explicit *chunksize* value is specified. What I had previously was oversimplified because it ignored the remainder that was left after the division. If you have a *large* number of tasks to run, then you should try to use a `map` call and specify an explicit, *chunksize* value. if you are using `ProcessPoolExecutor.map`. Also look at `Pool.imap` and `Pool.imap_unordered` with suitable *chunksize*, which can use generators for the *iterable*. which can save storage. – Booboo Jul 02 '21 at 17:45
  • Thanks a lot for your helps, in fact, it is also being used for generators. – ir0098 Jul 02 '21 at 17:51
  • `ProcessPoolExecutor.map` will also work with generators without having to convert the *iterable* to a list. But that is why its default *chunksize* value is 1; in general it cannot know the size of the *iterable*. But `Pool,map` converts the *iterable* argument to a list in order to get its length so it can compute a good *chunksize* value. That's why if you are using a generator and you want to evaluate it "lazily", i.e. not first convert it to a list which will potentially take a lot of storage, you must use `imap`. But then you should supply a *chunksize* value. – Booboo Jul 02 '21 at 18:31
  • You might want to take a look at https://stackoverflow.com/questions/20776189/concurrent-futures-vs-multiprocessing-in-python-3/60644649#60644649, which compares the `Pool` and `ProcessPoolExecutor` classes. It's a bit rambling, but it might be useful -- even educational. – Booboo Jul 03 '21 at 15:53