0

I want to use multiprocessing.pool in tf.data to speed up my augmentation function. But the result is slower than normal for loop.

multiprocessing.pool cost about: 72s

normal for loop cost about: 57s

My environment: python3.6, tensorflow-gpu2.4.0, Ubuntu20.04

Below is my code, what am I doing wrong?

Prerequisite thanks!

import numpy as np
import tensorflow as tf
from functools import partial
import multiprocessing

INPUT_SHAPE = (2000,6)
OUTPUT_SHAPE = (200,6)


def resizing(i ,data, enable, choice):
    if i==0:
        overlap=0
    else:
        overlap= 5 if enable >= 0.5 else 0
    if choice == 0:
        return [np.mean(data[i-overlap: i+10+overlap,0]),
                np.mean(data[i-overlap: i+10+overlap,1]),
                np.mean(data[i-overlap: i+10+overlap,2]),
                np.mean(data[i-overlap: i+10+overlap,3]),
                np.mean(data[i-overlap: i+10+overlap,4]),
                np.mean(data[i-overlap: i+10+overlap,5])]
    elif choice == 1:
        return [np.std(data[i-overlap: i+10+overlap,0]),
                np.std(data[i-overlap: i+10+overlap,1]),
                np.std(data[i-overlap: i+10+overlap,2]),
                np.std(data[i-overlap: i+10+overlap,3]),
                np.std(data[i-overlap: i+10+overlap,4]),
                np.std(data[i-overlap: i+10+overlap,5])]
    elif choice == 2:
        return [np.max(data[i-overlap: i+10+overlap,0]),
                np.max(data[i-overlap: i+10+overlap,1]),
                np.max(data[i-overlap: i+10+overlap,2]),
                np.max(data[i-overlap: i+10+overlap,3]),
                np.max(data[i-overlap: i+10+overlap,4]),
                np.max(data[i-overlap: i+10+overlap,5])]
    elif choice == 3:
        return [np.min(data[i-overlap: i+10+overlap,0]),
                np.min(data[i-overlap: i+10+overlap,1]),
                np.min(data[i-overlap: i+10+overlap,2]),
                np.min(data[i-overlap: i+10+overlap,3]),
                np.min(data[i-overlap: i+10+overlap,4]),
                np.min(data[i-overlap: i+10+overlap,5])]

def resize_data(data, pool_obj):

    choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int64).numpy()
    enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64).numpy()
    new_data = pool_obj.map(partial(resizing, 
                                    data=data,
                                    enable=enable, 
                                    choice=choice), 
                            range(0,2000,10))
  # new_data = []
  # for i in range(0,2000,10):
  #     new_data.append(resizing(i ,data, enable, choice))
    
    return np.array(new_data)


def augmentation(data, labels, pool_obj):
    def aug(data):
        data = data.numpy()
        
        ...      

        # 2000 resize to 200
        data = resize_data(data, pool_obj)
        
        ...
        
        return tf.convert_to_tensor(data, tf.float64)

    data = tf.py_function(aug, [data], [tf.float64])[0]
    data.set_shape(OUTPUT_SHAPE)
    return data, labels

def test(trainDS):
    for d in trainDS:
        X, y = d
        print(i, X.shape, y.shape)
        

if __name__ == '__main__':
    pool_obj = multiprocessing.Pool()
    trainDS = tf.data.Dataset.from_tensor_slices(getDataSet_Path())
    trainDS = (
        trainDS
        .map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
        .cache()
        .shuffle(300, reshuffle_each_iteration=False)
        .map(partial(augmentation, pool_obj=pool_obj), num_parallel_calls=tf.data.AUTOTUNE)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE)
    )
    
    test(trainDS)
Linminxiang
  • 325
  • 2
  • 14
  • Why not just use `num_parallel_calls` argument of `.map` ? – bluesummers Jan 06 '22 at 11:59
  • @bluesummers , I have used `tf.data.AUTOTUNE` in `.map`, can I set something else here? – Linminxiang Jan 06 '22 at 12:03
  • 2
    Yeah, you can set it to any number you like - and it uses your cores. So when you use `num_parallel_calls` you don't actually need multiprocessing as you are already utilizing your cores efficiently – bluesummers Jan 06 '22 at 12:04
  • Maybe I need to reconstruct the content of my function with `tf` to effectively accelerate it? – Linminxiang Jan 06 '22 at 12:07
  • @Linminxiang, we have to remove all `numpy` and `if` things from all functions inside `map` not to accelerate but to make it work properly. – Alexey Tochin Jan 06 '22 at 12:14
  • @AlexeyTochin , Ok, let me try, I am not very familiar with `tf` yet. – Linminxiang Jan 06 '22 at 12:22
  • @Linminxiang, [this](https://stackoverflow.com/questions/69709804/how-to-randomly-select-from-set-of-functions-in-tensorflow-using-tf-function/70323219#70323219) and [this](https://stackoverflow.com/questions/70332161/tensorflow-what-is-the-difference-between-using-tf-greater-and-simply-greater/70367665#70367665) must help you, – Alexey Tochin Jan 06 '22 at 12:30

1 Answers1

0

TensorFlow Dataset API is already equipped with built in multiprocessing. Just use num_parallel_calls parameter in map and prefetch feature without any pythonic multiprocessing tools. Besides, pass only TensorFlow style functions to map that can be converted to graph. In particular, avoid using pythonic if blocks, try tf.cond, tf.where and etc. instead. Numpy routines are also not recommended, use TensorFlow analogous. Follow guides like this.

Alexey Tochin
  • 653
  • 5
  • 8
  • Thank you very much for your comments. When I replaced most of `if else` and `numpy` with tensor, the execution efficiency improved a lot. – Linminxiang Jan 07 '22 at 12:11