9

Hi I am studying the dataset API in tensorflow now and I have a question regarding to the dataset.map() function which performs data preprocessing.

file_name = ["image1.jpg", "image2.jpg", ......]
im_dataset = tf.data.Dataset.from_tensor_slices(file_names)
im_dataset = im_dataset.map(lambda image:tuple(tf.py_func(image_parser(), [image], [tf.float32, tf.float32, tf.float32])))
im_dataset = im_dataset.batch(batch_size)
iterator = im_dataset.make_initializable_iterator()

The dataset takes in image names and parse them into 3 tensors (3 infos about the image).

If I have a very larger number of images in my training folder, preprocessing them is gonna take a long time. My question is that, since Dataset API is said to be designed for efficient input pipeline, the preprocessing is done for the whole dataset before I feed them to my workers (let's say GPUs), or it only preprocess one batch of image each time I call iterator.get_next()?

Jiang Wenbo
  • 469
  • 2
  • 9
  • 20
  • 1
    I suggest you have a look at this post: https://stackoverflow.com/questions/46444018/meaning-of-buffer-size-in-dataset-map-dataset-prefetch-and-dataset-shuffle – ma3oun Feb 11 '18 at 14:34
  • @ma3oun Hi thank for your reply! That resolved my doubts :), I still have a small question. When I set "batch_size" of dataset.prefetch() to a value, should I also assign "num_parallel_calls" in dataset.map() to the same value? – Jiang Wenbo Feb 11 '18 at 15:35
  • num_parallel_calls is used when you want to have multiple threads to read input data. It is not related to batch_size. – ma3oun Feb 11 '18 at 15:45
  • @ma3oun, thanks for following up. I made a mistake (it's buffer_size instead of batch_size) Coz the way I see it, when the buffer_size is larger, more thread is desired to fill up the buffer? – Jiang Wenbo Feb 11 '18 at 15:49
  • Exactly. In any case, I would make sure the buffer_size is at least a bit larger than the batch_size. It should be much larger with more reading threads. – ma3oun Feb 11 '18 at 16:04
  • @ma3oun Thanks, your answers are very helpful! :) – Jiang Wenbo Feb 11 '18 at 16:14

1 Answers1

13

If your preprocessing pipeline is very long and the output is small, the processed data should fit in memory. If this is the case, you can use tf.data.Dataset.cache to cache the processed data in memory or in a file.

From the official performance guide:

The tf.data.Dataset.cache transformation can cache a dataset, either in memory or on local storage. If the user-defined function passed into the map transformation is expensive, apply the cache transformation after the map transformation as long as the resulting dataset can still fit into memory or local storage. If the user-defined function increases the space required to store the dataset beyond the cache capacity, consider pre-processing your data before your training job to reduce resource usage.


Example use of cache in memory

Here is an example where each pre-processing takes a lot of time (0.5s). The second epoch on the dataset will be much faster than the first

def my_fn(x):
    time.sleep(0.5)
    return x

def parse_fn(x):
    return tf.py_func(my_fn, [x], tf.int64)

dataset = tf.data.Dataset.range(5)
dataset = dataset.map(parse_fn)
dataset = dataset.cache()    # cache the processed dataset, so every input will be processed once
dataset = dataset.repeat(2)  # repeat for multiple epochs

res = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:
    for i in range(10):
        # First 5 iterations will take 0.5s each, last 5 will not
        print(sess.run(res))

Caching to a file

If you want to write the cached data to a file, you can provide an argument to cache():

dataset = dataset.cache('/tmp/cache')  # will write cached data to a file

This will allow you to only process the dataset once, and run multiple experiments on the data without reprocessing it again.

Warning: You have to be careful when caching to a file. If you change your data, but keep the /tmp/cache.* files, it will still read the old data that was cached. For instance, if we use the data from above and change the range of the data to be in [10, 15], we will still obtain data in [0, 5]:

dataset = tf.data.Dataset.range(10, 15)
dataset = dataset.map(parse_fn)
dataset = dataset.cache('/tmp/cache')
dataset = dataset.repeat(2)  # repeat for multiple epochs

res = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:
    for i in range(10):
        print(sess.run(res))  # will still be in [0, 5]...

Always delete the cached files whenever the data that you want to cache changes.

Another issue that may arise is if you interrupt the script before all the data is cached. You will receive an error like this:

AlreadyExistsError (see above for traceback): There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/cache.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator.

Make sure that you let the whole dataset be processed to have an entire cache file.

Olivier Moindrot
  • 27,908
  • 11
  • 92
  • 91