minimal working example: ordered execution, results asap, max n threads
The following code snippets executes some functions in parallel with the following side conditions:
- max as many threads as cpu cores
- jobs can be executed by priority
- results are printed as soon as available
Code
import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
N_THREADS = mp.cpu_count()
def process_single_task(task_name: str):
n_sec = random.randint(0, 4)
print(f"start {task_name=}, {n_sec=}")
time.sleep(n_sec)
print(f"end {task_name=}, {n_sec=}")
return task_name, n_sec
def fct_to_multiprocessing(
fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
if semaphore is not None:
semaphore.acquire()
results[job_id] = fct(**fct_kwargs)
if semaphore is not None:
semaphore.release()
def process_all_tasks(tasks: List[str]):
manager = mp.Manager()
results = manager.dict() # <class 'multiprocessing.managers.DictProxy'>
sema = mp.Semaphore(N_THREADS)
jobs = {}
job_ids = list(range(len(tasks)))
for job_id in job_ids:
task = tasks[job_id]
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": process_single_task, "fct_kwargs": {"task_name": task},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[job_id].start()
for job_id in job_ids:
job = jobs[job_id]
job.join()
result = results[job_id]
print(f"job {tasks[job_id]} returned {result=}")
if __name__ == "__main__":
tasks = list("abcdefghijklmnopqrstuvwxyz")
process_all_tasks(tasks)
Output
start task_name='a', n_sec=4
start task_name='c', n_sec=2
end task_name='c', n_sec=2
start task_name='b', n_sec=2
end task_name='a', n_sec=4
start task_name='d', n_sec=1
job a returned result=('a', 4)
end task_name='b', n_sec=2
start task_name='e', n_sec=0
end task_name='e', n_sec=0
job b returned result=('b', 2)
job c returned result=('c', 2)
start task_name='f', n_sec=0
end task_name='f', n_sec=0
start task_name='j', n_sec=2
end task_name='d', n_sec=1
start task_name='g', n_sec=1
job d returned result=('d', 1)
job e returned result=('e', 0)
job f returned result=('f', 0)
end task_name='g', n_sec=1
start task_name='i', n_sec=3
job g returned result=('g', 1)
end task_name='j', n_sec=2
start task_name='h', n_sec=1
end task_name='h', n_sec=1
start task_name='o', n_sec=4
job h returned result=('h', 1)
end task_name='i', n_sec=3
start task_name='n', n_sec=2
job i returned result=('i', 3)
job j returned result=('j', 2)
end task_name='n', n_sec=2
start task_name='k', n_sec=2
end task_name='o', n_sec=4
start task_name='r', n_sec=1
end task_name='r', n_sec=1
start task_name='m', n_sec=1
end task_name='k', n_sec=2
start task_name='l', n_sec=4
job k returned result=('k', 2)
end task_name='m', n_sec=1
start task_name='s', n_sec=3
end task_name='s', n_sec=3
start task_name='p', n_sec=3
end task_name='l', n_sec=4
start task_name='q', n_sec=0
end task_name='q', n_sec=0
start task_name='t', n_sec=0
end task_name='t', n_sec=0
job l returned result=('l', 4)
job m returned result=('m', 1)
job n returned result=('n', 2)
job o returned result=('o', 4)
start task_name='u', n_sec=4
end task_name='p', n_sec=3
start task_name='v', n_sec=0
end task_name='v', n_sec=0
start task_name='x', n_sec=4
job p returned result=('p', 3)
job q returned result=('q', 0)
job r returned result=('r', 1)
job s returned result=('s', 3)
job t returned result=('t', 0)
end task_name='u', n_sec=4
start task_name='y', n_sec=4
job u returned result=('u', 4)
job v returned result=('v', 0)
end task_name='x', n_sec=4
start task_name='z', n_sec=0
end task_name='z', n_sec=0
start task_name='w', n_sec=1
end task_name='w', n_sec=1
job w returned result=('w', 1)
job x returned result=('x', 4)
end task_name='y', n_sec=4
job y returned result=('y', 4)
job z returned result=('z', 0)
** Process exited - Return Code: 0 **
Press Enter to exit terminal
Disclaimer: time.sleep(n_sec)
thereby stands for some computational heavy function. If its actually just waiting, asyncio
is in general a better choice (even though increasing the number of threads here should do the job as well).