41

I am trying to prefetch training data to hide I/O latency. I would like to write custom Python code that loads data from disk and preprocesses the data (e.g. by adding a context window). In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?

Update: I have a working example based on @mrry's example.

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
  with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
    while not coord.should_stop():
      feature_array = np.fromfile(feature_file, np.float32, 128)
      if feature_array.shape[0] == 0:
        print('reach end of file, reset using seek(0,0)')
        feature_file.seek(0,0)
        label_file.seek(0,0)
        continue
      label_value = np.fromfile(label_file, np.float32, 128)

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
  sum = sess.run(c)
  print('train_iter='+str(i))
  print(sum)

coord.request_stop()
coord.join([t])
read Read
  • 5,765
  • 4
  • 29
  • 30
  • 5
    I just made a notebook about queues that also explains a similar use case, I hope it can be useful to others as well: https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 – AkiRoss Apr 03 '17 at 16:53
  • @AkiRoss this website can't be accessed... – Shiqing Fan Jul 26 '18 at 10:14

2 Answers2

54

This is a common use case, and most implementations use TensorFlow's queues to decouple the preprocessing code from the training code. There is a tutorial on how to use queues, but the main steps are as follows:

  1. Define a queue, q, that will buffer the preprocessed data. TensorFlow supports the simple tf.FIFOQueue that produces elements in the order they were enqueued, and the more advanced tf.RandomShuffleQueue that produces elements in a random order. A queue element is a tuple of one or more tensors (which can have different types and shapes). All queues support single-element (enqueue, dequeue) and batch (enqueue_many, dequeue_many) operations, but to use the batch operations you must specify the shapes of each tensor in a queue element when constructing the queue.

  2. Build a subgraph that enqueues preprocessed elements into the queue. One way to do this would be to define some tf.placeholder() ops for tensors corresponding to a single input example, then pass them to q.enqueue(). (If your preprocessing produces a batch at once, you should use q.enqueue_many() instead.) You might also include TensorFlow ops in this subgraph.

  3. Build a subgraph that performs training. This will look like a regular TensorFlow graph, but will get its input by calling q.dequeue_many(BATCH_SIZE).

  4. Start your session.

  5. Create one or more threads that execute your preprocessing logic, then execute the enqueue op, feeding in the preprocessed data. You may find the tf.train.Coordinator and tf.train.QueueRunner utility classes useful for this.

  6. Run your training graph (optimizer, etc.) as normal.

EDIT: Here's a simple load_and_enqueue() function and code fragment to get you started:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
  with open(...) as feature_file, open(...) as label_file:
    while True:
      feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
      if not feature_array:
        return
      label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
  sess.run(train_op)
mrry
  • 125,488
  • 26
  • 399
  • 400
  • 1
    Thanks for your advice. I have another question. In my experiment, training feature and label are stored in two separate binary files. Should I build two queues, one for feature and one for label? If we want to get a random pair (feature, label) from the two queues, how do I make sure the feature corresponds to the correct label? In other words, how can I guarantee the one-to-one mapping? – read Read Jan 05 '16 at 01:23
  • To keep the one-to-one mapping, you should build a single queue where each element is a tuple of a feature tensor and a label tensor. You can do this by specifying a list of types (and shapes) to the queue constructor. This ensures that components of the same tuple are always dequeued together. – mrry Jan 05 '16 at 04:35
  • The features and labels are stored separately in two big binary files. So I need to build feat_queue = tf.train.string_input_producer(feat_filenames) and label_queue= tf.train.string_input_producer(label_filenames). Then I will also have two tf.FixedLengthRecordReader to get feat from feat_queue and label from label_queue separately. Finally I enqueue [feat, label] to another queue. Here is the problem. When I use FixedLengthRecordReader to get feat and label, are they always mapped correctly? – read Read Jan 05 '16 at 06:11
  • As long as you use run the two `read()` ops and the `enqueue()` op in the same call to `Session.run()`, and there's only a single thread running that subgraph at once, the mapping will be preserved. (Note that you might find it easier to implement all of the reading logic in Python, e.g. using `numpy.fromfile()` to read a batch from each file, and then enqueue a batch of records at a time. This approach might also be more efficient if you have a large number of small records.) – mrry Jan 05 '16 at 06:19
  • If I use numpy.fromfile to load a small block from a big binary file, can we hide the IO latency? For example, I define a function called load_and_enqueue which does numpy.from and enqueue(). I am not sure if load_and_enqueue() will be executed by another thread when GPU is doing computation. Can you give an example of making load_and_enqueue() a subgraph? – read Read Jan 05 '16 at 14:04
  • 1
    I added an example to make it clearer. TL;DR: if you call `sess.run()` from two different threads, they will run in parallel. – mrry Jan 05 '16 at 15:42
  • Thanks for your example! I have a final question. If I use many threads for load_and_enqueue(), will the feature-to-label mapping be preserved? I need to insert some data preprocessing code (e.g. data distortion) into load_and_squeue(), so extra latency will be introduced. I wish I can use more threads to hide the extra latency (data preprocessing latency). – read Read Jan 05 '16 at 16:30
  • Yes, adding more threads is fine: as long as the feature-to-label mapping is preserved at the point when you call `sess.run()`, the queue will maintain that mapping. (Note that if your distortion can be implemented in TensorFlow, you might be better off having one thread running `load_and_enqueue()`, and a set of threads that dequeue items from the input queue, apply the distortion, and enqueue the distorted data to another queue, `q2`. The training step would use `q2` as input. The [HOWTO](https://www.tensorflow.org/versions/master/how_tos/reading_data/index.html#preprocessing) explains this. – mrry Jan 05 '16 at 16:39
  • In your example, the queue is part of the graph. What if during evaluation of the model I would like to use feed_dict{feature_input: a , label_input: b} to directly inject some online data to the graph nodes (i.e. the two place holders: feature_input and label_input)? Does the graph automatically ignore the queue nodes below the two place holders? – read Read Jan 05 '16 at 16:48
  • Yes, that would work. We often do that in order to share the same model for training (using the queue(s)) and inference (feeding the input directly). – mrry Jan 05 '16 at 16:50
  • That's a nice feature. Sorry to bother you again. If I use many threads for load_and_enqueue() on sequential data (e.g. each (feat, label) pair is a time step), will time order of the pairs be preserved? I seems that many threads can be reading from the two big binary files at the same moment, and the binary file offset is changed by many threads. Will this lead to reading the same chunk of data more than once? – read Read Jan 05 '16 at 17:07
  • That depends on how your write your multi-threaded Python code. If each thread opens its own thread-local file object for the files, then the threads should not interfere, but you will need to write more code to partition the file between the threads so that each thread reads different data. (For example, you could give each thread a different offset within the file and use `seek()` to move each thread to a different position.) – mrry Jan 05 '16 at 19:44
  • I appended an working example in the question. Could you take a look and see it is correct/efficient? – read Read Jan 05 '16 at 20:56
  • @mrry how do you pass parameters to `load_and_enqueue` function? sorry may be this is obvious, but I am new to python... – Dims Feb 03 '16 at 16:01
  • @mrry may be you forgot `args=(sess,enqueue_op, coord)` as it was at question author? – Dims Feb 03 '16 at 16:02
  • Ah, that was a copy-paste error. In fact, `load_and_enqueue()` shouldn't need any arguments because the nested function can capture the variables above. You're correct that adding `args=(...)` to the `Thread` constructor is another appropriate way to pass args if they're required. – mrry Feb 03 '16 at 16:26
  • I think I'm missing something. If `TRAINING_EPOCHS` is, say, 2, `load_and_enqueue` will run through the feature / label files once and then return, causing the thread to exit. TensorFlow has no ability to invoke this function again, and so it will be trying to get data that isn't there and will hang indefinitely. – eriophora Mar 28 '16 at 21:06
  • Should tf.float32 and tf.int32 be swapped around in q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])? – Greg Cawthorne Apr 29 '17 at 15:50
  • @GregCawthorne, thanks for catching that. I updated the answer. – mrry May 01 '17 at 16:18
  • Thank you for your answer, sir. The placeholders in the queue, are they shared variables or separate?@mrry – Tengerye Aug 16 '17 at 07:43
7

In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?

Yes, it is. mrry's solution works, but simpler exists.

Fetching data

tf.py_func wraps a python function and uses it as a TensorFlow operator. So we can load the data at sess.run() each time. The problem with this approach is that data is loaded during sess.run() via the main thread.

A minimal example:

def get_numpy_tensor():
  return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)

A more complex example:

def get_numpy_tensors():
  # Load data from the disk into numpy arrays.
  input = np.array([[1,2],[3,4]], dtype=np.float32)
  target = np.int32(1)
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target

sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2

Prefetching data in another thread

To queue our data in another thread (so that sess.run() won't have to wait for the data), we can use tf.train.batch() on our operators from tf.py_func().

A minimal example:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.

We can omit the argument shapes if tensorflow_tensor has its shape specified:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.

A more complex example:

input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
  input = np.random.rand(*input_shape).astype(np.float32)
  target = np.random.randint(10, dtype=np.int32)
  print('f', end='')
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets

sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
  numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
  assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
  print('r', end='')

# Prints `fffffrrffrfrffrffrffrffrffrffrf`.

In case get_numpy_tensor() returns a batch of tensors, then tf.train.batch(..., enqueue_many=True) will help.

AlexP
  • 1,416
  • 1
  • 19
  • 26