0

So, I've been exploring multiprocessing and multithreading topics recently. And i found, in some cases, using multiprocess and multithread won't increase the speed of my code. Here one example:

import multiprocessing as mp
import time

dummyList = [1,2,3,4,5,6]
dummyList2 = ['a','b','c','d','e','f']

q_list = mp.Queue()
for i, j  in zip(dummyList, dummyList2):
    q_list.put(i)
    q_list.put(j)

def f(queue):
    q = queue.get()
    print(q)

# if __name__ == "__main__":
#     start = time.perf_counter()
#     while not q_list.empty():
#         p1 = mp.Process(target=f, args=[q_list])
#         p2 = mp.Process(target=f, args=[q_list])
#         p1.start()
#         p2.start()
#         p1.join()
#         p2.join()
#     finish = time.perf_counter()
#     print(f'elaspse time = {finish - start} second(s)')

start = time.perf_counter()
while not q_list.empty():
    f(q_list)
finish = time.perf_counter()
print(f'elaspse time = {finish - start} second(s)')

In the code above, I tried taking one item from the multiprocessing.Queue class and print it until the Queue is empty. I thought that using multiprocessing in such case will increase the speed. Surprisingly, instead of increasing speed, it got slower instead! by a huge difference also. Without multiprocessing, it only took 2ms, while it took 690ms using multiprocessing.

Can anyone explain to me, why is this happening? and when is actually the best time to use multiprocessing/multithreading. Ty

  • You may find this article helpful https://stackoverflow.com/questions/18114285/what-are-the-differences-between-the-threading-and-multiprocessing-modules –  Jul 31 '21 at 07:22

1 Answers1

1

There is overhead first in creating new processes and overhead in reading and writing to multiprocessing queues that you do not have in just passing parameters to functions running in the same process. That means your "worker" function, f in this case, must be sufficiently "CPU-intensive" to justify the additional overhead I just mentioned.

As the article What are the differences between the threading and multiprocessing modules? points out, multithreading is not suitable for CPU-intensive functions because of contention for the Global Interpreter Lock. But because there is far less overhead in creating threads than processes, it is most suitable for functions that are mostly waiting for I/O to complete, such as fetching a URL from a website, where there is very little CPU processing involved.

See the following two benchmarks where the function is all CPU and no I/O and thus potentially a candidate for multiprocessing. It compares single processing vs. multiprocessing where in the first case we have a non-CPU-intensive function where multiprocessing hurts performance and in the the second case we have a much more CPU-intensive function where multiprocessing improves performance:

import multiprocessing as mp
import time

QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multiprocessing version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

# multiprocessing version
def m_compute_square(input_q, output_q, cpu_intensive):
    """ Compute x ** 2: """
    while True:
        x = input_q.get()
        if x is None: # our signal to terminate
            break
        if cpu_intensive:
            quarter_second()
        output_q.put(x, x ** 2)

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        input_queue = mp.Queue()
        output_queue = mp.Queue()
        for x in numbers:
            input_queue.put(x)
        # Put two "no more input" indicators:
        input_queue.put(None)
        input_queue.put(None)
        p1 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
        p2 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
        p1.start()
        p2.start()
        results = [output_queue.get() for _ in range(100)]
        p1.join()
        p2.join()
        t1 = time.perf_counter()
        print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

Prints:

Non-multiprocessing time = 3.600000000000825e-05, intensive = False
Mutiprocessing time = 0.1501859, intensive = False
Non-multiprocessing time = 25.417471099999997, intensive = True
Mutiprocessing time = 14.596532500000002, intensive = True

Using a Multiprocessing Pool

import multiprocessing as mp
from functools import partial
import time


QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multiprocessing version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        # create processing pool using all 8 processors:
        with mp.Pool(8) as pool:
            worker = partial(compute_square, cpu_intensive=intensive)
            results = pool.map(worker, numbers)
        t1 = time.perf_counter()
        print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

Prints:

Non-multiprocessing time = 3.9300000000006e-05, intensive = False
Mutiprocessing time = 0.22172129999999995, intensive = False
Non-multiprocessing time = 26.1021124, intensive = True
Mutiprocessing time = 7.3056439, intensive = True

Using a Multithreading Pool

from multiprocessing.pool import ThreadPool
from functools import partial
import time


QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum

# non-multithreading version:
def compute_square(x, cpu_intensive):
    """ Compute x ** 2 """
    if cpu_intensive:
        quarter_second()
    return x ** 2

def main():
    numbers = range(1, 101)

    for intensive in (False, True):
        t0 = time.perf_counter()
        results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
        t1 = time.perf_counter()
        print(f'Non-multithreading time = {t1 - t0}, intensive = {intensive}')
        t0 = time.perf_counter()
        # create processing pool using all processors:
        with ThreadPool(8) as pool:
            worker = partial(compute_square, cpu_intensive=intensive)
            results = pool.map(worker, numbers)
        t1 = time.perf_counter()
        print(f'Mutithreading time = {t1 - t0}, intensive = {intensive}')

# Required for Windows:
if __name__=='__main__':
    main()

Prints:

Non-multithreading time = 3.0000000000002247e-05, intensive = False
Mutithreading time = 0.03963000000000001, intensive = False
Non-multithreading time = 26.428487699999998, intensive = True
Mutithreading time = 29.0095318, intensive = True

Because the "worker" function is pure CPU, multithreading cannot improve performance and in fact just adds additional overhead.

Multithreading Pool Where Worker Function is Mostly "I/O"

In the following benchmark, compute_square emulates waiting for I/O to complete by sleeping. In this case it is a candidate for multithreading as it is spends most of its time not executing actual Python bytecode and thus there is little contention for the Global Interpreter Lock.

from multiprocessing.pool import ThreadPool
from functools import partial
import time

def compute_square(x):
    """ Compute x ** 2 """
    time.sleep(.25)
    return x ** 2

def main():
    numbers = range(1, 101)

    t0 = time.perf_counter()
    results = [compute_square(x) for x in numbers]
    t1 = time.perf_counter()
    print(f'Non-multithreading time = {t1 - t0}')
    t0 = time.perf_counter()
    # create pool using all processors:
    with ThreadPool(8) as pool:
        results = pool.map(compute_square, numbers)
    t1 = time.perf_counter()
    print(f'Mutithreading time = {t1 - t0}')

if __name__=='__main__':
    main()

Prints:

Non-multithreading time = 25.1188871
Mutithreading time = 4.039328099999999
Booboo
  • 38,656
  • 3
  • 37
  • 60