0

I am wondering why my CPU load is so low even though I do not get a high processing rate:

import time
from multiprocessing import Pool
import numpy as np
from skimage.transform import AffineTransform, SimilarityTransform, warp

center_shift = 256 / 2
tf_center = SimilarityTransform(translation=-center_shift)
tf_uncenter = SimilarityTransform(translation=center_shift)


def sample_gen_random_i():
    for i in range(10000000000000):
        x = np.random.rand(256, 256, 4)
        y = [0]

        yield x, y


def augment(sample):
    x, y = sample
    rotation = 2 * np.pi * np.random.random_sample()
    translation = 5 * np.random.random_sample(), 5 * np.random.random_sample()
    scale_factor = np.random.random_sample() * 0.2 + 0.9
    scale = scale_factor, scale_factor

    tf_augment = AffineTransform(scale=scale, rotation=rotation, translation=translation)
    tf = tf_center + tf_augment + tf_uncenter

    warped_x = warp(x, tf)

    return warped_x, y


def augment_parallel_sample_gen(samples):
    p = Pool(4)

    for sample in p.imap_unordered(augment, samples, chunksize=10):
        yield sample

    p.close()
    p.join()


def augment_sample_gen(samples):
    for sample in samples:
        yield augment(sample)



# This is slow and the single cpu core has 100% load
print('Single Thread --> Slow')
samples = sample_gen_random_i()
augmented = augment_sample_gen(samples)

start = time.time()
for i, sample in enumerate(augmented):
    print(str(i) + '|' + str(i / (time.time() - start))[:6] + ' samples / second', end='\r')
    if i >= 2000:
        print(str(i) + '|' + str(i / (time.time() - start))[:6] + ' samples / second')
        break

# This is slow and there is only light load on the cpu cores
print('Multithreaded --> Slow')
samples = sample_gen_random_i()
augmented = augment_parallel_sample_gen(samples)

start = time.time()
for i, sample in enumerate(augmented):
    print(str(i) + '|' + str(i / (time.time() - start))[:6] + ' samples / second', end='\r')
    if i >= 2000:
        print(str(i) + '|' + str(i / (time.time() - start))[:6] + ' samples / second')
        break

I am using multiprocessing.Pool's imap, but I think there is some overhead. I can reach about 500 samples/s when using no augmentation and no multiprocessing, 150 with augmentation no multiprocessing and like 170 with augmentation and multiprocessing so I suspect there must be something wrong with my approach. The code should be executable and self explanatory! :)

Tim Joseph
  • 847
  • 2
  • 14
  • 28
  • 1
    Possible duplicate of [Why does multiprocessing use only a single core after I import numpy?](http://stackoverflow.com/questions/15639779/why-does-multiprocessing-use-only-a-single-core-after-i-import-numpy) – vks May 04 '17 at 08:25

1 Answers1

0

The problem seems to be that with

return warped_x, y

passing the images to the processed and passing back the whole transformed image to the main process seems to be the bottleneck. If I only give back for example the first pixel

return x[0, 0, 0], y

and move sample creation onto the child processes

def augment(y):
    x = np.random.rand(256, 256, 4)
    rotation = 2 * np.pi * np.random.random_sample()
    ...

the speed will scale up nearly linearly with the number of cores...

Maybe threads will work better than processes (?)

Tim Joseph
  • 847
  • 2
  • 14
  • 28
  • 1
    Give it a try but keep in mind that in CPython only one thread at any given time does execute Python bytecode. So Threads only help here if there is enough work being done within C extensions that are releasing the global interpreter lock. – BlackJack May 04 '17 at 17:31