2

I just upgraded to tensorflow 2.3. I want to make my own data generator for training. With tensorflow 1.x, I did this:

def get_data_generator(test_flag):
  item_list = load_item_list(test_flag)
  print('data loaded')
  while True:
    X = []
    Y = []
    for _ in range(BATCH_SIZE):
      x, y = get_random_augmented_sample(item_list)
      X.append(x)
      Y.append(y)
    yield np.asarray(X), np.asarray(Y)

data_generator_train = get_data_generator(False)
data_generator_test = get_data_generator(True)
model.fit_generator(data_generator_train, validation_data=data_generator_test, 
                    epochs=10000, verbose=2,
                    use_multiprocessing=True,
                    workers=8,
                    validation_steps=100,
                    steps_per_epoch=500,
                    )

This code worked fine with tensorflow 1.x. 8 processes were created in the system. The processor and video card were loaded perfectly. "data loaded" was printed 8 times.

With tensorflow 2.3 i got warning:

WARNING: tensorflow: multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.

"data loaded" was printed once(should 8 times). GPU is not fully utilized. It also have memory leak every epoch, so traning will stops after several epochs. use_multiprocessing flag did not help.

How to make a generator / iterator in tensorflow(keras) 2.x that can easily be parallelized across multiple CPU processes? Deadlocks and data order are not important.

user1941407
  • 2,722
  • 4
  • 27
  • 39
  • 1
    Would you be OK with using a `tf.data` pipeline? As the warning stated, that can be parallelized. – jkr Oct 17 '20 at 17:49
  • I tried using tf.keras.utils.Sequence. This class works and is parallelized as needed. The problem is that I have a lot of code for tensorflow 1 using a standard python generator. I want this code to continue working with tensorflow 2 without a lot of rewriting. – user1941407 Oct 18 '20 at 21:13
  • Have you looked into [`tf.data.Dataset.from_generator`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#from_generator)? You can probably make a `tf.data.Dataset` from your existing generator. I think the generator should yield one sample per iteration, and then you can batch using `dataset.batch(BATCH_SIZE)`. – jkr Oct 18 '20 at 21:50

1 Answers1

5

With a tf.data pipeline, there are several spots where you can parallelize. Depending on how your data are stored and read, you can parallelize reading. You can also parallelize augmentation, and you can prefetch data as you train, so your GPU (or other hardware) is never hungry for data.

In the code below, I have demonstrated how you can parallelize augmentation and add prefetching.

import numpy as np
import tensorflow as tf

x_shape = (32, 32, 3)
y_shape = ()  # A single item (not array).
classes = 10

# This is tf.data.experimental.AUTOTUNE in older tensorflow.
AUTOTUNE = tf.data.AUTOTUNE

def generator_fn(n_samples):
    """Return a function that takes no arguments and returns a generator."""
    def generator():
        for i in range(n_samples):
            # Synthesize an image and a class label.
            x = np.random.random_sample(x_shape).astype(np.float32)
            y = np.random.randint(0, classes, size=y_shape, dtype=np.int32)
            yield x, y
    return generator

def augment(x, y):
    return x * tf.random.normal(shape=x_shape), y

samples = 10
batch_size = 5
epochs = 2

# Create dataset.
gen = generator_fn(n_samples=samples)
dataset = tf.data.Dataset.from_generator(
    generator=gen, 
    output_types=(np.float32, np.int32), 
    output_shapes=(x_shape, y_shape)
)
# Parallelize the augmentation.
dataset = dataset.map(
    augment, 
    num_parallel_calls=AUTOTUNE,
    # Order does not matter.
    deterministic=False
)
dataset = dataset.batch(batch_size, drop_remainder=True)
# Prefetch some batches.
dataset = dataset.prefetch(AUTOTUNE)

# Prepare model.
model = tf.keras.applications.VGG16(weights=None, input_shape=x_shape, classes=classes)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy")

# Train. Do not specify batch size because the dataset takes care of that.
model.fit(dataset, epochs=epochs)
M Z
  • 4,571
  • 2
  • 13
  • 27
jkr
  • 17,119
  • 2
  • 42
  • 68
  • Is it possible to use `tf.keras.utils.Sequence` generator in `tf.data.Dataset.from_generator`? – Innat Mar 06 '21 at 03:48
  • @M.Innat - I'm sure it's possible. Feel free to open a new question. If you link it here, I'd be happy to take a look. – jkr Mar 06 '21 at 14:57
  • With the approach you showed, I've seen others to do such same. But I think few issues come with the `Sequence` class generator. Please have a look [here](https://github.com/tensorflow/tensorflow/issues/39523). – Innat Mar 08 '21 at 12:50