0

So I have this toy example code;

import glob
from tqdm import tqdm
import tensorflow as tf

imgPaths = glob.glob("/home/msmith/imgs/*/*") # Some images

filenameQ = tf.train.string_input_producer(imgPaths)
reader = tf.WholeFileReader()
key, value = reader.read(filenameQ)

img = tf.image.decode_jpeg(value)
init_op = tf.initialize_all_variables()

with tf.Session() as sess:
    sess.run(init_op)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in tqdm(range(10000)):
        img.eval().mean()

which loads images and prints the mean of each one. How to I edit it so it's multithreading the loading part of the images, which is at the moment my bottleneck on my tf image scripts.

mattdns
  • 894
  • 1
  • 11
  • 26
  • I would take a look at [QueueRunner](https://www.tensorflow.org/versions/r0.11/how_tos/threading_and_queues/index.html#queuerunner) class, although it's not clear to me how to connect it with a pre-built reader. – sygi Nov 25 '16 at 11:53

1 Answers1

3

EDIT (2018/3/5): It's now easier to get the same results using the tf.data API.

import glob
from tqdm import tqdm
import tensorflow as tf

imgPaths = glob.glob("/home/msmith/imgs/*/*") # Some images

dataset = (tf.data.Dataset.from_tensor_slices(imgPaths)
           .map(lambda x: tf.reduce_mean(tf.decode_jpeg(tf.read_file(x))),
                num_parallel_calls=16)
           .prefetch(128))

iterator = dataset.make_one_shot_iterator()
next_mean = iterator.get_next()

with tf.Session() as sess:
    for i in tqdm(range(10000)):
        sess.run(next_mean)

As sygi suggests in their comment, a tf.train.QueueRunner can be used to define some ops that run in a separate thread, and (typically) enqueue values into a TensorFlow queue.

import glob
from tqdm import tqdm
import tensorflow as tf

imgPaths = glob.glob("/home/msmith/imgs/*/*") # Some images

filenameQ = tf.train.string_input_producer(imgPaths)

# Define a subgraph that takes a filename, reads the file, decodes it, and                                                                                     
# enqueues it.                                                                                                                                                 
filename = filenameQ.dequeue()
image_bytes = tf.read_file(filename)
decoded_image = tf.image.decode_jpeg(image_bytes)
image_queue = tf.FIFOQueue(128, [tf.uint8], None)
enqueue_op = image_queue.enqueue(decoded_image)

# Create a queue runner that will enqueue decoded images into `image_queue`.                                                                                   
NUM_THREADS = 16
queue_runner = tf.train.QueueRunner(
    image_queue,
    [enqueue_op] * NUM_THREADS,  # Each element will be run from a separate thread.                                                                                       
    image_queue.close(),
    image_queue.close(cancel_pending_enqueues=True))

# Ensure that the queue runner threads are started when we call                                                                                               
# `tf.train.start_queue_runners()` below.                                                                                                                      
tf.train.add_queue_runner(queue_runner)

# Dequeue the next image from the queue, for returning to the client.                                                                                          
img = image_queue.dequeue()

init_op = tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init_op)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    for i in tqdm(range(10000)):
        img.eval().mean()
mrry
  • 125,488
  • 26
  • 399
  • 400
  • This is great. A few more things; if I want to do preprocessing do I do this before image_queue.dequeue()? Also when can I find out whether or not the threads have finished going through the list of inputs? – mattdns Dec 06 '16 at 16:59
  • 1
    For preprocessing, you would do this before `image_queue.dequeue()`, but you might add another queue/`QueueRunner` if you want another set of threads to perform that in parallel with the parsing. If the images all have the same size, you might find [`tf.train.batch()`](https://www.tensorflow.org/versions/r0.12/api_docs/python/io_ops.html#batch) useful for this. The easiest way to tell when the threads have finished is to use `while not coord.should_stop():` instead of the `for` loop. – mrry Dec 06 '16 at 17:12
  • Excellent. The label of the image is encoded in the string of the filename, if I can turn that into a OH vector and I want the right vector out at the right time... do I do that by adding another ```enqueue_op``` tensor for the class vector into that list ```[enqueue_op]```? By the way I can't pay the bounty for another 2 hours. – mattdns Dec 06 '16 at 17:23
  • 1
    If you want to pass the filename around with the image, you would add a second component to the queue. This ensures that the image data and the filename are paired together as they pass through the graph. The constructor would become `image_queue = tf.FIFOQueue(128, [tf.string, tf.uint8], None)`, the enqueue op would become `enqueue_op = image_queue.enqueue([filename, decoded_image])`, and the dequeue op would return two tensors: `fname, image = image_queue.dequeue()`. – mrry Dec 06 '16 at 17:25
  • what about with a csv? – mattdns Dec 07 '16 at 11:53
  • Say I can successfully decode a csv with 3 columns ```idx, label, path = tf.decode_csv(v,record_defaults=defaults)```. Would path be passed onto the subgraph ```image_bytes = tf.read_file(path)```? – mattdns Dec 07 '16 at 12:52
  • 1
    Yes, in that case you could pass `path` (extracted from the CSV) to the `tf.read_file()` op. In this case you'd probably have another reader (a `tf.TextLineReader`) upstream for reading the CSV file, and a queue of CSV filenames (e.g. from a `tf.train.string_input_producer()`) passed into its `read()` method. – mrry Dec 07 '16 at 15:55