0

I write code for a program that will operate with very big files with 'bytes' data (e.g. 4GB for x = 2048, y = 2048, time = 1000 in code below). In some cases it could be up to 16GB files. I think that absolute_bytearray(data) can be accelerated at least four times with multiprocessing (because when i run the program, only around 28% CPU is loaded):

How to Multi-thread an Operation Within a Loop in Python

How to apply multiprocessing in a right way for my code?

from time import perf_counter
from random import getrandbits

x = 512
y = 512
time = 200

xyt = x*y*time

my_by = bytearray(getrandbits(8) for x in range(xyt))

def absolute_bytearray(data):
    for i in range(len(data)):
        if data[i] > 127:
            data[i] = 255 - data[i]
    return data

start = perf_counter()
absolute_bytearray(my_by)
end = perf_counter()
print('time abs my_by = %.2f' % (end - start))  # around 6,70s for 512*512*200

Or maybe you know a faster solution?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
Michael
  • 1
  • 3
  • You are actually going to lose performance here with multi threading (or multi processing). Say you split the array into chunks and let each thread process a chunk, for this kind of operation python can only run one thread at a time so the other ones would be waiting (google python GIL) and you end up losing time because of thread starting etc. If you were to use multi-processing, you would need some kind of inter-process communication and slicing, copying and passing lists around takes a long time and can be heavy on memory usage. In C or C++ you could make it faster with threads – Milos Matovic Dec 12 '18 at 13:53
  • I've tried the 3 scenarios as in my comment above and got: No adaptations = 6.12s, 5 threads = 35.24s, 5 processes = 5.48s (but double memory usage) – Milos Matovic Dec 12 '18 at 15:08

1 Answers1

2

Since you operate on big-ish data here, using shared memory would be a good option to keep the memory footprint low while parallelizing the job. The multiprocessing module i.a. offers Array for this case:

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Return a ctypes array allocated from shared memory. By default the return value is actually a synchronized wrapper for the array. docs

The code below also uses multiple processes to create the data. Please get the code for the mp_utils module from my answer here. The two functions from there are for creating "fair" ranges over the indexes of your shared array. These batch_ranges are send to the worker processes and each process will work on the shared array at indexes contained in these ranges.

import random
import ctypes
from time import perf_counter
from multiprocessing import Process, Array

from mp_utils import calc_batch_sizes, build_batch_ranges


def f(data, batch_range):
    """Target processing function."""
    for i in batch_range:
        if data[i] > 127:
            data[i] = 255 - data[i]


def create_data(array, batch_range):
    """Fill specified range of array with random bytes."""
    rd = random.Random(42)  # arbitrary seed 42
    getrandbits = rd.getrandbits  # for speed
    for i in batch_range:
        array[i] = getrandbits(8)


def process_tasks(target, tasks):
    """Process tasks by starting a new process per task."""
    pool = [Process(target=target, args=task) for task in tasks]

    for p in pool:
        p.start()
    for p in pool:
        p.join()


def main(x, y, time, n_workers):

    xyt = x * y * time

    # creating data
    creation_start = perf_counter()  # ----------------------------------------
    # We don't need a lock here, because our processes operate on different
    # subsets of the array.
    sha = Array(ctypes.c_ubyte, xyt, lock=False)  # initialize zeroed array
    batch_ranges = build_batch_ranges(calc_batch_sizes(len(sha), n_workers))
    tasks = [*zip([sha] * n_workers, batch_ranges)]

    process_tasks(target=create_data, tasks=tasks)
    print(f'elapsed for creation: {perf_counter() - creation_start:.2f} s')  #-
    print(sha[:30])

    # process data
    start = perf_counter()  # -------------------------------------------------
    process_tasks(target=f, tasks=tasks)
    print(f'elapsed for processing: {perf_counter() - start:.2f} s')  # -------
    print(sha[:30])


if __name__ == '__main__':

    N_WORKERS = 8
    X = Y = 512
    TIME = 200

    main(X, Y, TIME, N_WORKERS)

Example Output:

elapsed for creation: 5.31 s
[163, 28, 6, 189, 70, 62, 57, 35, 188, 26, 173, 189, 228, 139, 22, 151, 108, 8, 7, 23, 55, 59, 129, 154, 6, 143, 50, 183, 166, 179]
elapsed for processing: 4.36 s
[92, 28, 6, 66, 70, 62, 57, 35, 67, 26, 82, 66, 27, 116, 22, 104, 108, 8, 7, 23, 55, 59, 126, 101, 6, 112, 50, 72, 89, 76]

Process finished with exit code 0

I'm running this on a SandyBridge (2012) machine, 8 cores (4 Hyper-Threading), Ubuntu 18.04.

Your serial original code gets:

elapsed for creation: 22.14 s
elapsed for processing: 16.78 s

So I'm getting about a four times speed up with my code (roughly as much as my machine has real cores).

These numbers are for 50 MiB (512x512x200) data. I also tested with 4 GiB (2048x2048x1000), timings improved accordingly from 1500 s (serial) to 366 s (parallel).

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • Yes, but performance win for absolute operation ist not four times: e.g. 5,42 (your code) vs 6,73 (original code). I want to try it out on bigger files, but i have some other problems with reading big files and implementing your code inside of my program. I need some time before i come back to this problem :) – Michael Dec 18 '18 at 14:37
  • @Michael How is it with the exact same code from my answer? Do you get similar improvements compared to your serial example in your question? – Darkonaut Dec 18 '18 at 21:46
  • sorry, but it doesn't. When i apply your code to my 50MB data it need 14s against 4s in serial. I am doing something wrong. But now i have another problem. I need to divide data from beginning in n batches, and apply multiprocessing in this step. And after that read batches in small chunks, apply funktion f to every chunk and sum the results and collect them to one list. – Michael Jan 15 '19 at 13:42
  • @Michael I'm asking about **exactly** my code, so with my generated data, not _your_ 50MB data. Did you set `lock=False` like in my code? – Darkonaut Jan 15 '19 at 14:01
  • not 4 times faster in processing, 5,85s your code vs 7,2s original code, as i wrote Dec 18 '18 at 14:37 – Michael Jan 15 '19 at 15:01
  • @Michael If that's the timing for the exact code I cannot comprehend how you go to 14s if you just switch the data if it's the same size. What I can say generally is that multiprocessing always involves some overhead, so if your computation is already relatively short (see my timings in contrast) you cannot expect the timings to go down linearly forever with the number of processes going up. If your computation is too light to earn back the overhead, adding multiprocessing will even worsen the timings. – Darkonaut Jan 15 '19 at 15:21
  • @Darknout for coparison: X=Y=512, time=1000. Results: my_code: creation 57s, processing 37,5s. your code: creation 33s, processing 27,8s. But thanks a lot for trying to help! – Michael Jan 15 '19 at 15:33
  • @Michael Which cpu model do you have? – Darkonaut Jan 15 '19 at 15:37
  • @Darknout Intel(R) Core(TM) i5-5200U CPU @ 2.20GHz – Michael Jan 17 '19 at 10:03
  • @Michael You have only two real cores with this cpu. In a tight loop computation this means you can expect about a two times speed up with using two workers. Using more processes than that can be even harmful. Your maximum on parallel workers with this cpu is four (four hardware-threads on two cores), but this does not mean it can improve timings much more in your case. Try with two and with four workers. If you use all four hardware-threads it also depends what else is running on your machine since that needs cpu-time too. – Darkonaut Jan 17 '19 at 15:37