9

I am using concurrent.futures module to do multiprocessing and multithreading. I am running it on a 8 core machine with 16GB RAM, intel i7 8th Gen processor. I tried this on Python 3.7.2 and even on Python 3.8.2

import concurrent.futures
import time
takes list and multiply each elem by 2
def double_value(x):
  y = []
  for elem in x:
    y.append(2 *elem)
  return y
multiply an elem by 2
def double_single_value(x):
  return 2* x
define a
import numpy as np
a = np.arange(100000000).reshape(100, 1000000)
function to run multiple thread and multiple each elem by 2
 def get_double_value(x):
  with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(double_single_value, x)
  return list(results)

code shown below ran in 115 seconds. This is using only multiprocessing. CPU utilization for this piece of code is 100%

t = time.time()

with concurrent.futures.ProcessPoolExecutor() as executor:
  my_results = executor.map(double_value, a)
print(time.time()-t)

Below function took more than 9 min and consumed all the Ram of system and then system kill all the process. Also CPU utilization during this piece of code is not upto 100% (~85%)

t = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
  my_results = executor.map(get_double_value, a)

print(time.time()-t)

I really want to understand:

1) why the code that first split do multiple processing and then run tried multi-threading is not running faster than the code that runs only multiprocessing ?

(I have gone through many post that describe multiprocessing and multi-threading and one of the crux that I got is multi-threading is for I/O process and multiprocessing for CPU processes ? )

2) Is there any better way of doing multi-threading inside multiprocessing for max utilization of allotted core(or CPU) ?

3) Why that last piece of code consumed all the RAM ? Was it due to multi-threading ?

Community
  • 1
  • 1
learner
  • 828
  • 2
  • 19
  • 36
  • What is my_double_value function do? – Paul Jun 19 '20 at 13:02
  • @Paul My bad, that was by mistake, I have corrected it. Please check now – learner Jun 19 '20 at 13:56
  • What version of python are you using? The ThreadPoolExecutor has changed how many workers it's willing to spawn by default in 3.8. – EnticingCanine Jun 19 '20 at 14:29
  • I am using Python 3.7.2 – learner Jun 19 '20 at 14:38
  • @EnticingCanine I am getting same results with 3.8.2 also. Just checked – learner Jun 19 '20 at 15:00
  • A pool typically creates a process/thread for each CPU, so your example creates 8 processes, each of which creates 8 threads. The threads are running CPU-bound Python code so will serialize due to the GIL. All-in-all not efficient. – Mark Tolonen Jun 19 '20 at 15:44
  • @MarkTolonen yes seems to be inefficient use of more core. I am trying to figure out a case , basically "how to" , create a multiple threading inside multiple processes. Any thoughts on that ? – learner Jun 19 '20 at 15:46
  • 1
    For a CPU-bound task, a process per CPU is as efficient as you'll get, assuming the CPU-bound task is large enough to warrant the overhead of starting the processes and transferring the data to them. And threads don't help at all in CPython for a CPU-bound task due to the GIL. – Mark Tolonen Jun 19 '20 at 16:23
  • Is there any prior way to know whether the task will fit the CPU or not ? If a task can be fitted in CPU then can I assume that multi-hreading will work faster as compared to multi-threading ? – learner Jun 19 '20 at 16:37

6 Answers6

5

You can mix concurrency with parallelism. Why? You can have your valid reasons. Imagine a bunch of requests you have to make while processing their responses (e.g., converting XML to JSON) as fast as possible.

I did some tests and here are the results. In each test, I mix different workarounds to make a print 16000 times (I have 8 cores and 16 threads).

Parallelism with multiprocessing, concurrency with asyncio

The fastest, 1.1152372360229492 sec.

import asyncio
import multiprocessing
import os
import psutil
import threading
import time

async def print_info(value):
    await asyncio.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

async def await_async_logic(values):
    await asyncio.gather(
        *(
            print_info(value)
            for value in values
        )
    )

def run_async_logic(values):
    asyncio.run(await_async_logic(values))

def multiprocessing_executor():
    start = time.time()
    with multiprocessing.Pool() as multiprocessing_pool:
        multiprocessing_pool.map(
            run_async_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()

Very important note: with asyncio I can spam tasks as much as I want. For example, I can change the value from 1000 to 10000 to generate 160000 prints and there is no problem (I tested it and it took me 2.0210490226745605 sec).

Parallelism with multiprocessing, concurrency with threading

An alternative option, 1.6983509063720703 sec.

import multiprocessing
import os
import psutil
import threading
import time

def print_info(value):
    time.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

def multithreading_logic(values):
    threads = []
    for value in values:
        threads.append(threading.Thread(target=print_info, args=(value,)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

def multiprocessing_executor():
    start = time.time()
    with multiprocessing.Pool() as multiprocessing_pool:
        multiprocessing_pool.map(
            multithreading_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()

Very important note: with this method I can NOT spam as many tasks as I want. If I change the value from 1000 to 10000 I get RuntimeError: can't start new thread. I also want to say that I am impressed because I thought that this method would be better in every aspect compared to asyncio, but quite the opposite.

Parallelism and concurrency with concurrent.futures

Extremely slow, 50.08251595497131 sec.

import os
import psutil
import threading
import time
from concurrent.futures import thread, process

def print_info(value):
    time.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

def multithreading_logic(values):
    with thread.ThreadPoolExecutor() as multithreading_executor:
        multithreading_executor.map(
            print_info,
            values,
        )

def multiprocessing_executor():
    start = time.time()
    with process.ProcessPoolExecutor() as multiprocessing_executor:
        multiprocessing_executor.map(
            multithreading_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()

Very important note: with this method, as with asyncio, I can spam as many tasks as I want. For example, I can change the value from 1000 to 10000 to generate 160000 prints and there is no problem (except for the time).

Extra notes

To make this comment, I modified the test so that it only makes 1600 prints (modifying the 1000 value with 100 in each test).

When I remove the parallelism from asyncio, the execution takes me 16.090194702148438 sec. In addition, if I replace the await asyncio.sleep(1) with time.sleep(1), it takes 160.1889989376068 sec.

Removing the parallelism from the multithreading option, the execution takes me 16.24941658973694 sec. Right now I am impressed. Multithreading without multiprocessing gives me good performance, very similar to asyncio.

Removing parallelism from the third option, execution takes me 80.15227723121643 sec.

Lucas Vazquez
  • 1,456
  • 16
  • 20
3

As you say: "I have gone through many post that describe multiprocessing and multi-threading and one of the crux that I got is multi-threading is for I/O process and multiprocessing for CPU processes".

You need to figure out, if your program is IO-bound or CPU-bound, then apply the correct method to solve your problem. Applying various methods at random or all together at the same time usually makes things only worse.

lenik
  • 23,228
  • 4
  • 34
  • 43
  • Yes even in my current case multi-threading is taking the lead but the operations is to get the double of the number which is a CPU oriented process – learner Jun 19 '20 at 15:44
  • Is there any case that using multi-thread & multi-process together? (in practical development) – han shih May 20 '22 at 03:50
  • 1
    @hanshih theoretically, it's possible that you have a CPU-bound task slated for the multiprocessing, while some of the processes might use threads for IO-bound HDD access or network communitation. – lenik May 20 '22 at 08:01
1

Use of threading in clean Python for CPU-bound problems is a bad approach regardless of using multiprocessing or not. Try to redesign your app to use only multiprocessing or use third-party libs such as Dask and so on

alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • Can you please provide more info why it is a bad practice even when there are threaading lib that comes with Python ? They are taking care of all the bad of opening GIL . – learner Jun 21 '20 at 12:58
1

I believe you figured it out, but I wanted to answer. Obviously, your function double_single_value is CPU bound. It has nothing to do with Io. In CPU bound tasks using multi-thread will make it worse than using a single thread, because GIL does not allow you actually run on multi-thread and you will eventually run on single thread. Also, you may not finish a task and go to another and when you get back you should load it to the CPU again, which will make this even slower.

Ferid Heziyev
  • 220
  • 2
  • 12
1

Based off your code, I see most of your code is dealing with computations(calculations) so it's most encouraged to use multiprocessing to solve your problem since it's CPU-bound and NOT I/O bound(things like sending requests to websites and then waiting for some response from the server in exchange, writing to disk or even reading from disk). This is true for Python programming as far as I know. The python GIL(Global Interpreter Lock) will make your code run slowly as it is a mutex (or a lock) that allows only one thread to take the control of the Python interpreter meaning it won't achieve parallelism but will give you concurrency instead. But it's very fine to use threading for I/O bound tasks because they'll outcompete multiprocessing in execution times but for your case i would encourage you to use multiprocessing because each Python process will get its own Python interpreter and memory space so the GIL won’t be a problem to you.

I am not so sure about integrating multithreading with multiprocessing but what i know it can cause inconsistency in the processed results since you will need more bolierplate code for data synchronization if you want the processes to communicate(IPC) and also threads are kinda unpredictable(thus inconsistent at times) since they're controlled by the OS so anytime they can be scooped out(pre-emptive scheduling) for kernel level threads(due to time sharing). i don't stop you from writing that code but be really sure of what you are doing. You never know you would propose a solution to it one day.

winterr_dog
  • 49
  • 1
  • 6
1

Yes there is - I use it all the time it it kicks RE: speed compared to each on their own or some of the methods referred to above.

I'll use the common Prime program with two methods incorproated as useful functions (thread_all and thread_all_p for multithread and mulitprocessing respectively):

import itertools
import math, time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import wraps
0
def data_mygrouper(n,
                   iterable):  # https://stackoverflow.com/questions/1624883/alternative-way-to-split-a-list-into-groups-of-n
    args = [iter(iterable)] * n
    return ([e for e in t if e != None] for t in itertools.zip_longest(*args))


def timeit(method):
    @wraps(method)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = method(*args, **kwargs)
        end_time = time.time()
        print(f"{method.__name__} => {(end_time-start_time)*1000} ms")
        return result
    return wrapper

def is_prime(n):
    global n_temp4
    if n < 2 or n == 2 or n % 2 == 0:
        return n,False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            #print(f'{n}: False')
            #n_temp4 += ([n, False])
            return n,False
    #print(f'{n}: True')
    #n_temp4 += ([n, True])
    return n,True

@timeit
def main():
    n_temp4, n_temp = [], []
    PRIMES = [num for num in range(19000, 200000)]

    #thread outside processsor inside
    PRIMES_v2 = list(data_mygrouper(round(len(PRIMES)/5 , ndigits=None), PRIMES))
    n_temp3 = thread_all(PRIMES_v2, thread_all_p, is_prime)
    n_temp4 = []
    n_temp4 += (list(x) for x in list(n_temp3))
    print(n_temp4)



def thread_all(ps, fn, parm='', workers=61, chunk=100000):
    # https://stackoverflow.com/questions/42056738/how-to-pass-a-function-with-more-than-one-argument-to-python-concurrent-futures/4205696 9000
    # 75
    print(f'thread_all({ps}, {fn}, {parm}')
    try:
        if parm == '':
            with ThreadPoolExecutor(max_workers=max(1, workers)) as executor:
                return executor.map(fn, ps, timeout=90, chunksize=max(1, chunk))
        else:
            with ThreadPoolExecutor(max_workers=max(1, workers)) as executor:
                return executor.map(fn, ps, itertools.repeat(parm, len(ps)), timeout=90, chunksize=max(1, chunk))
    except: pass

def thread_all_p(ps, fn, parm='', workers=61, chunk=100000):
    # https://stackoverflow.com/questions/42056738/how-to-pass-a-function-with-more-than-one-argument-to-python-concurrent-futures/4205696 9000
    # 75
    print(f'thread_all({ps}, {fn}, {parm}')
    try:
        if parm == '':
            with ProcessPoolExecutor(max_workers=max(1, workers)) as executor:
                return executor.map(fn, ps, timeout=90, chunksize=max(1, chunk))
        else:
            with ProcessPoolExecutor(max_workers=max(1, workers)) as executor:
                return executor.map(fn, ps, itertools.repeat(parm, len(ps)), timeout=90, chunksize=max(1, chunk))
    except: pass


if __name__ == "__main__":
    main()

Time (13th gen i9, 64 GB aka 'beast' ☺): 1.7 - 1.87(!!) seconds. Challenges welcome on same basis as code above... jokes.

enter image description here

Kudos:

  • Prime program - archived version here
JB-007
  • 2,156
  • 1
  • 6
  • 22