10

I am trying to use multiprocessing in python 3.6. I have a for loopthat runs a method with different arguments. Currently, it is running one at a time which is taking quite a bit of time so I am trying to use multiprocessing. Here is what I have:

def test(self):
    for key, value in dict.items():
        pool = Pool(processes=(cpu_count() - 1))
        pool.apply_async(self.thread_process, args=(key,value))
        pool.close()
        pool.join()


def thread_process(self, key, value):
    # self.__init__()
    print("For", key)

I think what my code is using 3 processes to run one method but I would like to run 1 method per process but I don't know how this is done. I am using 4 cores btw.

anderish
  • 1,709
  • 6
  • 25
  • 58
  • use "threading" or other python library for multi threading – Tamar Jun 20 '17 at 18:42
  • @Tamar don't, threading implies using a single core in Python. It will run functions in parallel, but not concurrently (at the same time). – Pedro von Hertwig Batista Jun 20 '17 at 18:45
  • Yes I tried multithreading, but it did not speed up runtime. I tried multiprocessing and it sped up the runtime a lot but I didn't provide a limit therefore, it almost destroyed my computer. – anderish Jun 20 '17 at 18:46
  • @PedrovonHertwig you have those two terms completely bushwackled. Parallel means at the same time, concurrent *can* mean at the same time, but it depends on the context of the hardware and ecosystem because of the use of threading (which parallel processing doesn't need to exist). please read this https://stackoverflow.com/questions/1897993/what-is-the-difference-between-concurrent-programming-and-parallel-programming – Krupip Jun 20 '17 at 18:49

6 Answers6

9

You're making a pool at every iteration of the for loop. Make a pool beforehand, apply the processes you'd like to run in multiprocessing, and then join them:

from multiprocessing import Pool, cpu_count
import time

def t():
    # Make a dummy dictionary
    d = {k: k**2 for k in range(10)}

    pool = Pool(processes=(cpu_count() - 1))

    for key, value in d.items():
        pool.apply_async(thread_process, args=(key, value))

    pool.close()
    pool.join()


def thread_process(key, value):
    time.sleep(0.1)  # Simulate a process taking some time to complete
    print("For", key, value)

if __name__ == '__main__':
    t()
1

You're not populating your multiprocessing.Pool with data - you're re-initializing the pool on each loop. In your case you can use Pool.map() to do all the heavy work for you:

def thread_process(args):
    print(args)

def test():
    pool = Pool(processes=(cpu_count() - 1))
    pool.map(thread_process, your_dict.items())
    pool.close()

if __name__ == "__main__":  # important guard for cross-platform use
    test()

Also, given all those self arguments I reckon you're snatching this off of a class instance and if so - don't, unless you know what you're doing. Since multiprocessing in Python essentially works as, well, multi-processing (unlike multi-threading) you don't get to share your memory, which means your data is pickled when exchanging between processes, which means anything that cannot be pickled (like instance methods) doesn't get called. You can read more on that problem on this answer.

zwer
  • 24,943
  • 3
  • 48
  • 66
1

I think what my code is using 3 processes to run one method but I would like to run 1 method per process but I don't know how this is done. I am using 4 cores btw.

No, you are in fact using the correct syntax here to utilize 3 cores to run an arbitrary function independently on each. You cannot magically utilize 3 cores to work together on one task with out explicitly making that a part of the algorithm itself/ coding that your self often using threads (which do not work the same in python as they do outside of the language).

You are however re-initializing the pool every loop you'll need to do something like this instead to actually perform this properly:

    cpus_to_run_on = cpu_count() - 1
    pool = Pool(processes=(cpus_to_run_on)
    # don't call a dictionary a dict, you will not be able to use dict() any 
    # more after that point, that's like calling a variable len or abs, you 
    # can't use those functions now
    pool.map(your_function, your_function_args)
    pool.close()

Take a look at the python multiprocessing docs for more specific information if you'd like to get a better understanding of how it works. Under python, you cannot utilize threading to do multiprocessing with the default CPython interpreter. This is because of something called the global interpreter lock, which stops concurrent resource access from within python itself. The GIL doesn't exist in other implementations of the language, and is not something other languages like C and C++ have to deal with (and thus you can actually use threads in parallel to work together on a task, unlike CPython)

Python gets around this issue by simply making multiple interpreter instances when using the multiprocessing module, and any message passing between instances is done via copying data between processes (ie the same memory is typically not touched by both interpreter instances). This does not however happen in the misleadingly named threading module, which often actually slow processes down because of a process called context switching. Threading today has limited usefullness, but provides an easier way around non GIL locked processes like socket and file reads/writes than async python.

Beyond all this though there is a bigger problem with your multiprocessing. Your writing to standard output. You aren't going to get the gains you want. Think about it. Each of your processes "print" data, but its all being displayed in one terminal/output screen. So even if your processes are "printing" they aren't really doing that independently, and the information has to be coalesced back into another processes where the text interface lies (ie your console). So these processes write whatever they were going to to some sort of buffer, which then has to be copied (as we learned from how multiprocessing works) to another process which will then take that buffered data and output it.

Typically dummy programs use printing as a means of showing how there is no order between execution of these processes, that they can finish at different times, they aren't meant to demonstrate the performance benefits of multi core processing.

Krupip
  • 4,404
  • 2
  • 32
  • 54
  • `threading` in Python (CPython) is not misleading in the slightest - it does use real system threads (pthreads, windows threads, whatever...) it's the GIL that prevents the parallel execution. In dependence of the OS scheduler any of those _threads_ can end up executing on a different core (just not two at the same time, with some rare exceptions). – zwer Jun 20 '17 at 19:06
  • @zwer its misleading because what you expect from threading doesn't actually happen, they don't act like pthreads because of the GIL. – Krupip Jun 20 '17 at 19:07
0

I have experimented a bit this week with multiprocessing. The fastest way that I discovered to do multiprocessing in python3 is using imap_unordered, at least in my scenario. Here is a script you can experiment with using your scenario to figure out what works best for you:

import multiprocessing

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'imap_unordered'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}")

I found that starmap was not too far behind in performance, in my scenario it used more cpu and ended up being a bit slower. Hope this boilerplate helps.

radtek
  • 34,210
  • 11
  • 144
  • 111
0

minimal working example: ordered execution, results asap, max n threads

The following code snippets executes some functions in parallel with the following side conditions:

  • max as many threads as cpu cores
  • jobs can be executed by priority
  • results are printed as soon as available

Code

import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy

N_THREADS = mp.cpu_count()

def process_single_task(task_name: str):
    n_sec = random.randint(0, 4)
    print(f"start {task_name=}, {n_sec=}")
    time.sleep(n_sec)
    print(f"end {task_name=}, {n_sec=}")
    return task_name, n_sec

def fct_to_multiprocessing(
        fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
    if semaphore is not None:
        semaphore.acquire()
    results[job_id] = fct(**fct_kwargs)
    if semaphore is not None:
        semaphore.release()

def process_all_tasks(tasks: List[str]):
    manager = mp.Manager()
    results = manager.dict()  # <class 'multiprocessing.managers.DictProxy'>
    sema = mp.Semaphore(N_THREADS)
    jobs = {}

    job_ids = list(range(len(tasks)))

    for job_id in job_ids:
        task = tasks[job_id]
        jobs[job_id] = mp.Process(
            target=fct_to_multiprocessing,
            kwargs={
                "fct": process_single_task, "fct_kwargs": {"task_name": task},
                "job_id": job_id, "results": results, "semaphore": sema
            }
        )
        jobs[job_id].start()

    for job_id in job_ids:
        job = jobs[job_id]
        job.join()
        result = results[job_id]
        print(f"job {tasks[job_id]} returned {result=}")


if __name__ == "__main__":
    tasks = list("abcdefghijklmnopqrstuvwxyz")
    process_all_tasks(tasks)

Output

start task_name='a', n_sec=4
start task_name='c', n_sec=2
end task_name='c', n_sec=2
start task_name='b', n_sec=2
end task_name='a', n_sec=4
start task_name='d', n_sec=1
job a returned result=('a', 4)
end task_name='b', n_sec=2
start task_name='e', n_sec=0
end task_name='e', n_sec=0
job b returned result=('b', 2)
job c returned result=('c', 2)
start task_name='f', n_sec=0
end task_name='f', n_sec=0
start task_name='j', n_sec=2
end task_name='d', n_sec=1
start task_name='g', n_sec=1
job d returned result=('d', 1)
job e returned result=('e', 0)
job f returned result=('f', 0)
end task_name='g', n_sec=1
start task_name='i', n_sec=3
job g returned result=('g', 1)
end task_name='j', n_sec=2
start task_name='h', n_sec=1
end task_name='h', n_sec=1
start task_name='o', n_sec=4
job h returned result=('h', 1)
end task_name='i', n_sec=3
start task_name='n', n_sec=2
job i returned result=('i', 3)
job j returned result=('j', 2)
end task_name='n', n_sec=2
start task_name='k', n_sec=2
end task_name='o', n_sec=4
start task_name='r', n_sec=1
end task_name='r', n_sec=1
start task_name='m', n_sec=1
end task_name='k', n_sec=2
start task_name='l', n_sec=4
job k returned result=('k', 2)
end task_name='m', n_sec=1
start task_name='s', n_sec=3
end task_name='s', n_sec=3
start task_name='p', n_sec=3
end task_name='l', n_sec=4
start task_name='q', n_sec=0
end task_name='q', n_sec=0
start task_name='t', n_sec=0
end task_name='t', n_sec=0
job l returned result=('l', 4)
job m returned result=('m', 1)
job n returned result=('n', 2)
job o returned result=('o', 4)
start task_name='u', n_sec=4
end task_name='p', n_sec=3
start task_name='v', n_sec=0
end task_name='v', n_sec=0
start task_name='x', n_sec=4
job p returned result=('p', 3)
job q returned result=('q', 0)
job r returned result=('r', 1)
job s returned result=('s', 3)
job t returned result=('t', 0)
end task_name='u', n_sec=4
start task_name='y', n_sec=4
job u returned result=('u', 4)
job v returned result=('v', 0)
end task_name='x', n_sec=4
start task_name='z', n_sec=0
end task_name='z', n_sec=0
start task_name='w', n_sec=1
end task_name='w', n_sec=1
job w returned result=('w', 1)
job x returned result=('x', 4)
end task_name='y', n_sec=4
job y returned result=('y', 4)
job z returned result=('z', 0)


** Process exited - Return Code: 0 **
Press Enter to exit terminal

Disclaimer: time.sleep(n_sec) thereby stands for some computational heavy function. If its actually just waiting, asyncio is in general a better choice (even though increasing the number of threads here should do the job as well).

Markus Dutschke
  • 9,341
  • 4
  • 63
  • 58
0

Example of multiple processing. Hopefully it will be helpful for you:

from multiprocessing import Process  

def fun_square(x):
    x_square = x**2
    print('x_square: ', x_square) 

def x_pow_y(x,y):
    x_pow_y = x**y
    print('x_pow_y: ', x_pow_y)

def fun_qube(z):
    z_qube = z*z*z
    print('z_qube: ', z_qube)
    
def normal_fun():
    print("Normal fun is working at same time...")


p1 = Process(target = fun_square, args=(5,)).start() #args=(x,)
p2 = Process(target = x_pow_y, args=(2,4,)).start() #args=(x,y,)
p3 = Process(target = fun_qube(5)).start() #fun_qube(z)
p4 = Process(target = normal_fun).start()
MD. SHIFULLAH
  • 913
  • 10
  • 16