Overview, architecture & some practical tips
from my own (also limited) experience I can share the following insights on how multiprocessing works and how to use it. I didn't find the python.org manuals very descriptive or graphic, so I read the code. For everybody who had the same impression... this is what I could make up for my so far:
General good/best practice tips
- general implementation methods:
- test-driven with reduced datasizes: you want don't want to wonder for minutes if crashed or calculating
- stepwise & with time profiling:
- first, implement & debug without multiprocessing
- next, implement & debug single-processed, profile time & compare overhead without multiple processes
- next, increase process number & profile time to identify any GIL issues and waiting times.
- Simple
Process
es or lists of of them are useful to target few function runs one-by-one funtction-2-process.
Pool
s handle the distribution of batchable workloads (highlevel tasks/commands) between a set number of Process
es (pool of processes).
- Use
Pool
for processor bound (high processor load with batchable inputs/outputs) and pool.ThreadPool
for IO-bound (low processor load with separate input/outputs) tasks.
- For data transfer between
Process
es, Pool
s, Thread
s and ThreadPool
s use queues.Queue
and subclasses (if result order matters) or Pipe
s with a 1-to-1 mapping of the PipeConnection
s to the processes or threads.
- Sharing variables of different types (
BaseProxy
, Namespace
s, Queue
s, Pool
s or for setting up synchronization objects like Barrier
/Lock
/RLock
/Sempaphore
/Condition
s between different processes use the Manager
class.
- In case
GIL
s cannot be avoided, use Manager
to handle them and try to separate the intense calculation processes from the GIL
-related calculations (e.g. parsing in complex data structures, etc.) and connect then with Pipe
s or shared Queue
s.
- Working with multiple
Pool
s can be used to assign different number of processes to different tasks. Otherwise just implement one Pool
with multiple mapping or apply method calls.
- Sequential parallel computing tasks building on each others' intermediate results can be calculated with a single
Pool()
with multiple Pool.(star)map_async()
or Pool.(star)map()
. For synchronizing the tasks with each other the ApplyResult()
instance returned by the mapping function with its methods ApplyResult().ready()/.wait()/.get()/.successful()
is the right choice.
Architecture and process flow
- When
import multiprocessing
is run the _current_process = MainProcess()
is initialized which is a subclass of BaseProcess
but without target
, args
, kwargs
, _paraent_pid
, basically a handle object for all other Process
es in the already running python kernel which imports multiprocessing
.
pool.ThreadPool
is an analogue API to Pool which probably also shares similar architecture
Pool
is based on 3 daemon threads Pool._task_handler
, Pool._worker_handler
& Pool._result_handler
which connect with 1 internal queue.Queue()
Pool._taskqueue
and 2 internal SimpleQueue
s Pool._inqueue
and Pool._outqueue
.
Pool._cache
is a dictionary holding the ApplyResults
& subclasses instances from all Pool.appy_async()/_map_async()
and submethod calls with the global ApplyResults._job
from job_counter()
as key
.
ApplyResult
s & subclasses of a Pool
are found either in Pool._cache
and as return of Pool.apply_async()/._map_async()
& submethods.
- The difference between
Pool.map()
and Pool.map_async()
is that Pool.map() == Pool.map_async().get()
which forces/locks the main process to wait for all results being calculated and stored in the return object ApplyResult()
.
- The
Queue
/SimpleQueues in
Pool`:
Pool.taskqueue
: pipes the highlevel job of Pool.apply_async()/.map_async()
/etc. chopped to task batches from the apply-method to the Pool._task_handler
.
Pool._inqueue
: pipes the job as batchwise ?iterator? from the Pool._task_handler
to the Pool._pool.Process(target=worker, ...)
Pool._outqueue
: pipes the results from the Pool._pool.Process(target=worker, ...)
(initialized by Pool._worker_handler
) to the Pool._result_handler
, which again _set()
s them in the ApplyResult
s cached in Pool._cache[self._job]
.
ApplyResult
will hold the results as list if the target func
have return objects. Otherwise the ApplyResult()
is just the handle for the synchronization methods, i.e. the result status call methods.
- For connecting processes and threads 4 classes are offered from high to simple functionality in the following order:
queues.JoinableQueue
, queues.Queue
, SimpleQueue
, Pipe
/PipeConnection
.
Pipe
is just a method returning 2 of the actual PipeConnection
class instances.
Some code examples
import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy
MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]
mp.log_to_stderr(level=logging.INFO) # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO) # mp.setLevel(level=logging.DEBUG)
def secs2hms(seconds, num_decimals=4):
hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
if hasattr(seconds, '__round__'):
hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
return hms_time
class Timer():
def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
self.time_name = time_name
self.output_method = get_log_method(method_name=log_method_name)
self.time_format = time_format
self.hms_decimals = hms_decimals
self.start_time = time.time()
def start(self):
raise RuntimeError('Timer was already started at initialization.')
def stop(self, *args):
seconds_time = time.time() - self.start_time
time_name = self.time_name.format(*args)
if self.time_format == 'hms':
hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
self.output_method('{} = {}'.format(time_name, hms_time))
else:
self.output_method('{} = {}sec'.format(time_name, seconds_time))
self._delete_timer()
def _delete_timer(self):
del self
def get_log_method(method_name):
if method_name == 'debug':
log_method = logger.debug
elif method_name == 'info':
log_method = logger.info
else:
log_method = print
return log_method
def _generate_random_array(shape):
return np.array([[[random.randint(0, 1000)
for _ in range(shape[2])]
for _ in range(shape[1])]
for _ in range(shape[0])])
def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
array = _generate_random_array(shape=shape)
log_method('{}: sending `array through `pipe_in`'.format(log_name))
pipe_in.send(array)
def random_array(shape, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
# for dataset_name in ['train', 'valid']:
# shared_arrays[dataset_name].append(array)
return array
def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
shared_arrays.append(array)
def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
nested_shared_arrays[dataset_name].append(array)
def nested_dict_list_deepcopy(nested_shared_arrays):
"""No hierachical switching between mp.manager.BaseProxy and unshared elements"""
nested_unshared_arrays = dict()
for key, shared_list in nested_shared_arrays.items():
nested_unshared_arrays[key] = deepcopy(shared_list)
return nested_unshared_arrays
def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
try:
if hasattr(arrays, '__len__'):
log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
if len(arrays) < 20:
for idx, array in enumerate(arrays):
log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
if hasattr(array, 'shape'):
log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
else:
log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
else:
log_method('{}: array has no `__len__` method'.format(log_name))
except BrokenPipeError as error_msg:
log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))
def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
for key, arrays in nested_arrays.items():
log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)
if __name__ == '__main__':
log_method = logger.info
# log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
# through
log_method_name = 'info'
num_samples = 100
num_processes = 1 # len(MODEL_INPUTS) #
array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]
def stdout_some_newlines(num_lines=2, sleep_time=1):
print(''.join(num_lines * ['\n']))
time.sleep(sleep_time)
# Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
# `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
# Pool with variable return
setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
getted_arrays = arrays.get()
async_return_timer.stop()
# Logging array state inside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
# Logging array state outside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
del pool, arrays, getted_arrays
stdout_some_newlines()
# Functionality of `np.Process().is_alive()
log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
p = mp.Process(target=lambda x: x ** 2, args=(10,))
log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
p.start()
log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
time.sleep(5)
log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
p.join()
log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
p.terminate()
del p
stdout_some_newlines()
# Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
# `AsyncResults()`
log_method(
'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
stdout_some_newlines()
# Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
# not for result receiving
log_method(
'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
'variable')
async_shared_timer = Timer(
time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
setup_shared_variable_timer.stop()
async_return_timer = Timer(
time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_pool_timer = Timer(
time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
async_result = pool.starmap_async(
func=random_shared_array,
iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
async_result.wait()
log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
async_return_timer.stop()
copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
unshared_arrays = deepcopy(shared_arrays)
copy_timer.stop()
async_shared_timer.stop()
log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method(
'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
'context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, async_result, pool, unshared_arrays
stdout_some_newlines()
# Same as above just with pipe instead of `shared_arrays`
log_method('MAIN: separate process outputting to `mp.Pipe()`')
process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
arrays = list()
pipe_in, pipe_out = mp.Pipe()
# initialize processes
processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
array_shapes]
# Start processes
for process in processes:
process.start()
# Collect piped arrays form pipe and append them to `arrays`
while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
log_method(
'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
len(arrays)))
arrays.append(pipe_out.recv())
# join processes
for process in processes:
process.join()
process_pipe_timer.stop()
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
pipe_in.close()
pipe_out.close()
del arrays, pipe_in, pipe_out, processes, process
stdout_some_newlines()
# Nested shared dict/list/arrays
log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
with mp.Manager() as sync_manager:
nested_shared_arrays = sync_manager.dict()
nested_shared_arrays['train'] = sync_manager.list()
nested_shared_arrays['valid'] = sync_manager.list()
with mp.Pool(processes=num_processes) as pool:
nested_results = pool.starmap_async(func=random_nested_array,
iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
for dataset_name in nested_shared_arrays.keys()
for shape in array_shapes])
nested_results.wait()
unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
nested_timer.stop()
log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
stdout_some_newlines()
# List of processes targeted directly to their functions one by one
log_method(
'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
' relations or asynchronous single tasks calculations.')
processes_timer = Timer(
time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
# Initialize processes
processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
for shape in array_shapes]
# Start processes
for process in processes:
process.start()
processes_timer.stop()
# Join processes = wait for processes to finish
for process in processes:
process.join()
unshared_process_arrays = deepcopy(shared_arrays)
processes_timer.stop()
log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, unshared_process_arrays, processes, process
stdout_some_newlines()