I want to use multiprocessing.pool
in tf.data
to speed up my augmentation function. But the result is slower than normal for loop.
multiprocessing.pool cost about: 72s
normal for loop cost about: 57s
My environment: python3.6
, tensorflow-gpu2.4.0
, Ubuntu20.04
Below is my code, what am I doing wrong?
Prerequisite thanks!
import numpy as np
import tensorflow as tf
from functools import partial
import multiprocessing
INPUT_SHAPE = (2000,6)
OUTPUT_SHAPE = (200,6)
def resizing(i ,data, enable, choice):
if i==0:
overlap=0
else:
overlap= 5 if enable >= 0.5 else 0
if choice == 0:
return [np.mean(data[i-overlap: i+10+overlap,0]),
np.mean(data[i-overlap: i+10+overlap,1]),
np.mean(data[i-overlap: i+10+overlap,2]),
np.mean(data[i-overlap: i+10+overlap,3]),
np.mean(data[i-overlap: i+10+overlap,4]),
np.mean(data[i-overlap: i+10+overlap,5])]
elif choice == 1:
return [np.std(data[i-overlap: i+10+overlap,0]),
np.std(data[i-overlap: i+10+overlap,1]),
np.std(data[i-overlap: i+10+overlap,2]),
np.std(data[i-overlap: i+10+overlap,3]),
np.std(data[i-overlap: i+10+overlap,4]),
np.std(data[i-overlap: i+10+overlap,5])]
elif choice == 2:
return [np.max(data[i-overlap: i+10+overlap,0]),
np.max(data[i-overlap: i+10+overlap,1]),
np.max(data[i-overlap: i+10+overlap,2]),
np.max(data[i-overlap: i+10+overlap,3]),
np.max(data[i-overlap: i+10+overlap,4]),
np.max(data[i-overlap: i+10+overlap,5])]
elif choice == 3:
return [np.min(data[i-overlap: i+10+overlap,0]),
np.min(data[i-overlap: i+10+overlap,1]),
np.min(data[i-overlap: i+10+overlap,2]),
np.min(data[i-overlap: i+10+overlap,3]),
np.min(data[i-overlap: i+10+overlap,4]),
np.min(data[i-overlap: i+10+overlap,5])]
def resize_data(data, pool_obj):
choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int64).numpy()
enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64).numpy()
new_data = pool_obj.map(partial(resizing,
data=data,
enable=enable,
choice=choice),
range(0,2000,10))
# new_data = []
# for i in range(0,2000,10):
# new_data.append(resizing(i ,data, enable, choice))
return np.array(new_data)
def augmentation(data, labels, pool_obj):
def aug(data):
data = data.numpy()
...
# 2000 resize to 200
data = resize_data(data, pool_obj)
...
return tf.convert_to_tensor(data, tf.float64)
data = tf.py_function(aug, [data], [tf.float64])[0]
data.set_shape(OUTPUT_SHAPE)
return data, labels
def test(trainDS):
for d in trainDS:
X, y = d
print(i, X.shape, y.shape)
if __name__ == '__main__':
pool_obj = multiprocessing.Pool()
trainDS = tf.data.Dataset.from_tensor_slices(getDataSet_Path())
trainDS = (
trainDS
.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(300, reshuffle_each_iteration=False)
.map(partial(augmentation, pool_obj=pool_obj), num_parallel_calls=tf.data.AUTOTUNE)
.batch(128, drop_remainder=True)
.prefetch(tf.data.AUTOTUNE)
)
test(trainDS)