Since you operate on big-ish data here, using shared memory would be a good option to keep the memory footprint low while parallelizing the job. The multiprocessing
module i.a. offers Array
for this case:
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
Return a ctypes array allocated from shared memory. By default the return value
is actually a synchronized wrapper for the array. docs
The code below also uses multiple processes to create the data. Please get the code for the mp_utils
module from my answer here. The two functions from there are for creating "fair" ranges over the indexes of your shared array.
These batch_ranges
are send to the worker processes and each process will work on the shared array at indexes contained in these ranges.
import random
import ctypes
from time import perf_counter
from multiprocessing import Process, Array
from mp_utils import calc_batch_sizes, build_batch_ranges
def f(data, batch_range):
"""Target processing function."""
for i in batch_range:
if data[i] > 127:
data[i] = 255 - data[i]
def create_data(array, batch_range):
"""Fill specified range of array with random bytes."""
rd = random.Random(42) # arbitrary seed 42
getrandbits = rd.getrandbits # for speed
for i in batch_range:
array[i] = getrandbits(8)
def process_tasks(target, tasks):
"""Process tasks by starting a new process per task."""
pool = [Process(target=target, args=task) for task in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
def main(x, y, time, n_workers):
xyt = x * y * time
# creating data
creation_start = perf_counter() # ----------------------------------------
# We don't need a lock here, because our processes operate on different
# subsets of the array.
sha = Array(ctypes.c_ubyte, xyt, lock=False) # initialize zeroed array
batch_ranges = build_batch_ranges(calc_batch_sizes(len(sha), n_workers))
tasks = [*zip([sha] * n_workers, batch_ranges)]
process_tasks(target=create_data, tasks=tasks)
print(f'elapsed for creation: {perf_counter() - creation_start:.2f} s') #-
print(sha[:30])
# process data
start = perf_counter() # -------------------------------------------------
process_tasks(target=f, tasks=tasks)
print(f'elapsed for processing: {perf_counter() - start:.2f} s') # -------
print(sha[:30])
if __name__ == '__main__':
N_WORKERS = 8
X = Y = 512
TIME = 200
main(X, Y, TIME, N_WORKERS)
Example Output:
elapsed for creation: 5.31 s
[163, 28, 6, 189, 70, 62, 57, 35, 188, 26, 173, 189, 228, 139, 22, 151, 108, 8, 7, 23, 55, 59, 129, 154, 6, 143, 50, 183, 166, 179]
elapsed for processing: 4.36 s
[92, 28, 6, 66, 70, 62, 57, 35, 67, 26, 82, 66, 27, 116, 22, 104, 108, 8, 7, 23, 55, 59, 126, 101, 6, 112, 50, 72, 89, 76]
Process finished with exit code 0
I'm running this on a SandyBridge (2012) machine, 8 cores (4 Hyper-Threading), Ubuntu 18.04.
Your serial original code gets:
elapsed for creation: 22.14 s
elapsed for processing: 16.78 s
So I'm getting about a four times speed up with my code (roughly as much as my machine has real cores).
These numbers are for 50 MiB (512x512x200) data. I also tested with 4 GiB (2048x2048x1000), timings improved accordingly from 1500 s (serial) to 366 s (parallel).