I always use the 'multiprocessing' native library to handle parallelism in Python. To control the number of processes in the queue, I use a shared variable as a counter. In the following example, you can see how the parallel execution of simple processes works.
I made an update to the script to make it easier to use. Basically, the only thing you have to do is override the process
method with the function you want to run in parallel. See the example, the procedure is very simple. Alternatively, you can also remove all execution log occurrences.
When I have some time, I'll update the code to work with processes that return values.
Requirements
user@host:~$ pip install coloredlogs==15.0.1
Code
Parallel processing script (copy and paste):
#!/usr/bin/env python
# encoding: utf-8
from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os
LOG_LEVEL = "DEBUG"
def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
# Setting-up the script logging:
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)
logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)
return logger
class ParallelProcessing:
"""
Parallel processing.
References
----------
[1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419
Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>> def process(self, name: str) -> None:
>>> logger = get_logger()
>>> logger.info(f"Executing process: {name}...")
>>> time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""
_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger
def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.
Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()
def process(self, *args) -> None:
"""
Abstract process that must be overridden.
Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")
def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.
Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1
def _error_callback(self, result: Any) -> None:
"""
Error callback.
Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)
def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
"""
Run processes in parallel.
Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
use_multithreading: bool
Use multithreading instead multiprocessing.
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)
start_time = datetime.now()
with lock: # Write-protecting the processes queue shared variable.
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1
# Running processes in parallel:
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)
break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)
pool.close()
pool.join()
exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")
Example of use:
class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)
def main() -> None:
n_jobs = int(sys.argv[1]) # Number of jobs to run in parallel.
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
mpp = MyParallelProcessing(n_jobs=n_jobs)
# Executing processes in parallel:
mpp.run(args_list=params_list)
if __name__ == '__main__':
main()
Execution and Output
user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934