multiprocessing template with live log of available results
I use multiprocessing to test newly developed code against massive amounts of test data. I thereby want to get results as fast as possible: If the new code fails for one of the test data, I can start developing a fix. While I do so, I want to see how the code performes on the rest of the test data. Then I can potentially change the order in which test data is processed in the next run (to see failures fast).
The following template
- executes a maximum number of processes in parallel (using semaphore)
- collects all results in a
pd.DataFrame
as soon as available
- prints a summary as soon as a new result is available
- non-parallel mode available for debugging
code
import sys
import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import logging
import pandas as pd
N_PROC = mp.cpu_count() - 1 # number of processes you want to run in parallel (others are waiting for semaphore)
MULTIPROCESSING_UPDATE_CICLE = .1 # wait so long until you check all jobs again if finished
# logging
DEFAULT_FORMAT = "\n%(levelname)s - %(asctime)s.%(msecs)03d - %(filename)s, l %(lineno)d:\n%(message)s"
DEFAULT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
default_stream_handler = logging.StreamHandler(sys.stdout)
default_stream_handler.setFormatter(logging.Formatter(fmt=DEFAULT_FORMAT, datefmt=DEFAULT_TIME_FORMAT))
logger = logging.getLogger("mp_template")
logger.setLevel(logging.DEBUG)
logger.addHandler(default_stream_handler)
# fix seed
random.seed(42) # a 'not so' arbitrary number
def process_single_task(task_name: str) -> Dict:
"""
This is the slow function you want to parallelize.
Parameters
----------
task_name : str
some input
Returns
-------
Dict :
Returns dictionary of different value produced during execution.
This is overengeneered for this example, but pretty handy for more complex function.
"""
result = {}
n_sec = random.randint(1, 4)
logger.debug(f"start {task_name=}, {n_sec=}")
time.sleep(n_sec)
logger.debug(f"end {task_name=}, {n_sec=}")
result['n_sec'] = n_sec
result['log'] = f"executed {task_name=}"
return result
def fct_to_multiprocessing(
fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
"""
Function for handling maximum number of active processes and managing each processes return value.
Parameters
----------
fct : Callable
Function to execute in separate process
fct_kwargs : Dict[str, Any]
kwargs for fct
job_id : int
id to manage results. Result is stored in results[job_id]
results: DictProxy
special mp dict to manage return values of fct
semaphore: mp.Semaphore
semaphore object to prevent more than N_PROC running in parallel
Example
-------
Use as following:
manager = mp.Manager()
results = manager.dict()
sema = mp.Semaphore(N_PROC)
jobs = {}
for job_id in ...:
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": ..., "fct_kwargs": {...},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[proj_name].start()
"""
if semaphore is not None:
semaphore.acquire()
results[job_id] = fct(**fct_kwargs)
if semaphore is not None:
semaphore.release()
def manage_results(df_results: pd.DataFrame, job_id: int, result: Dict) -> pd.DataFrame:
df_results.loc[job_id, result.keys()] = result.values()
logger.info(df_results)
return df_results
def process_all_tasks(tasks: List[str]):
logger.info(f"\n\n{''.center(80, '=')}\n{' started '.center(80, '=')}\n{''.center(80, '=')}\n")
logger.info(f"executing code on {N_PROC} / {mp.cpu_count()} simultaneous processes")
job_ids = [f"job_id={job_id}" for job_id in tasks]
df_results = pd.DataFrame(index=job_ids)
# run jobs
if N_PROC == 1: # no parallelization, good for debugging
for job_id, task in zip(job_ids, tasks):
result = process_single_task(task_name=task)
df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
else: # parallelization on
manager = mp.Manager()
results = manager.dict()
sema = mp.Semaphore(N_PROC)
jobs = {}
for job_id, task in zip(job_ids, tasks):
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": process_single_task, "fct_kwargs": {"task_name": task},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[job_id].start()
while jobs: # as soon as a job is completed, add this to df_results
for job_id in jobs.keys():
job = jobs[job_id]
if job.exitcode is not None: # a new job is completed
job.join()
result = results[job_id]
job.close()
del jobs[job_id]
df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
break
time.sleep(MULTIPROCESSING_UPDATE_CICLE)
logger.info(f"\n\n{''.center(80, '=')}\n{' finished '.center(80, '=')}\n{''.center(80, '=')}\n")
logger.info(df_results)
if __name__ == "__main__":
tasks = list("abcdef")
process_all_tasks(tasks)
output
$ python 230315_multiprocessing_template.py
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 98:
================================================================================
=================================== started ====================================
================================================================================
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 99:
executing code on 3 / 4 simultaneous processes
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:09.796 - 230315_multiprocessing_template.py, l 43:
start task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.797 - 230315_multiprocessing_template.py, l 45:
end task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.798 - 230315_multiprocessing_template.py, l 43:
start task_name='d', n_sec=1
INFO - 2023-03-15T10:51:10.901 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a NaN NaN
job_id=b NaN NaN
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:11.797 - 230315_multiprocessing_template.py, l 43:
start task_name='f', n_sec=2
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 43:
start task_name='e', n_sec=1
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 45:
end task_name='d', n_sec=1
INFO - 2023-03-15T10:51:11.807 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b NaN NaN
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
INFO - 2023-03-15T10:51:11.910 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d NaN NaN
job_id=e NaN NaN
job_id=f NaN NaN
INFO - 2023-03-15T10:51:12.014 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e NaN NaN
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:12.799 - 230315_multiprocessing_template.py, l 45:
end task_name='e', n_sec=1
INFO - 2023-03-15T10:51:12.819 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f NaN NaN
DEBUG - 2023-03-15T10:51:13.800 - 230315_multiprocessing_template.py, l 45:
end task_name='f', n_sec=2
INFO - 2023-03-15T10:51:13.824 - 230315_multiprocessing_template.py, l 94:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f 2.0 executed task_name='f'
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 140:
================================================================================
=================================== finished ===================================
================================================================================
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 141:
n_sec log
job_id=a 2.0 executed task_name='a'
job_id=b 2.0 executed task_name='b'
job_id=c 1.0 executed task_name='c'
job_id=d 1.0 executed task_name='d'
job_id=e 1.0 executed task_name='e'
job_id=f 2.0 executed task_name='f'