28

This is how-to which I believe is missed from TF examples.

Task:

  1. samples for each class are given in separate dir and thus labels are indirect (i.e. by dir)
  2. decoupled load and computations in TF

Each separate bit could be found, however I think have them all together in one place will help to save a lot of time for TF beginners (like myself).

Lets tackle 1. in my case it is two sets of images:

# all filenames for .jpg in dir 
#  - list of fnames
#  - list of labels 
def path_fnames(f_path, label, ext = ['.jpg', '.jpeg']):
    f_n = [f_path+'/'+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
    f_l = [label] * len(f_n)
    return f_n, f_l
#     
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels     = labels_dense.shape[0]
    index_offset   = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

data_dir = '/mnt/dataset/'
dir_1   = '/class_1'
dir_2   = '/class_2'

# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]

f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)

# --- create one-hot labels ---
ohl    = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2;               # one-hot labels created in this sequence

Now we have all file-names and one-hot labels preloaded.

Lets move to the 2.

It is based on How to prefetch data using a custom python function in tensorflow. In short it has:

  • custom image-reader (replace with yours)
  • queue fnl_q with [filename label] which is used by reader to feed
  • queue proc_q with [sample label] which is used to feed processing some_op
  • thread which perform read_op to get [sample label] and enqueue_op to put pair into proc_q. Thread is controlled by tf.Coordinator
  • some_op which first get data from proc_q by dequeue_many() and rest of computation (also could be put in separate thread).

Notes:

  • feature_read_op and label_read_op are two separate ops.
  • I use sleep() to slow down and control op - only for test purposes
  • i have separated "feeding" and "calculation" parts - in real case just run them in parallel
print 'TF version:', tf.__version__
# --- params ----
im_s       = [30, 30, 1]   # target image size
BATCH_SIZE = 16

# image reader 
# - fnl_queue: queue with [fn l] pairs 
# Notes 
# - to resize:  image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
    fn, label = fnl_queue.dequeue()

    if keep:
        fnl_queue.enqueue([fn, label])

    img_bytes = tf.read_file(fn)
    img_u8    = tf.image.decode_jpeg(img_bytes, channels=ch) 
    img_f32   = tf.cast(img_u8, tf.float32)/256.0  
    #img_4     = tf.expand_dims(img_f32,0)
    return img_f32, label

#  load [feature, label] and enqueue to processing queue
# - sess:             tf session 
# - sess:             tf Coordinator
# - [fr_op, lr_op ]:  feature_read_op label_read_op
# - enqueue_op:       [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    i = 0
    while not coord.should_stop():
        # for testing purpose
        time.sleep(0.1)                     
        #print 'load_and_enqueue i=',i
        #i = i +1

        feature, label = sess.run([feature_read_op, label_read_op ])

        feed_dict = {feature_input: feature,
                     label_input  : label}

        sess.run(enqueue_op, feed_dict=feed_dict)


# --- TF part ---

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

#fnl_q    = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q    = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])

# reading_op: feature_read_op label_read_op 
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])

# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=l_s, name='label_input')

#proc_q     = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q     = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])

# test: 
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op   = [img_batch, lab_batch]

# service ops
init_op   = tf.initialize_all_variables()

# let run stuff
with tf.Session() as sess:

    sess.run(init_op)
    sess.run(do_enq)

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    # --- test thread stuff ---
    #  - fill proc_q
    coord = tf.train.Coordinator()
    t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
    t.start()

    time.sleep(2.1)

    coord.request_stop()
    coord.join([t])

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    #  - process a bit 
    ss = sess.run(some_op)
    print 'ss[0].shape', ss[0].shape 
    print ' ss[1]:\n', ss[1]

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval() 

print 'ok'

Typical output:

TF version: 0.6.0

fnl_q.size: 1225
proc_q.size: 0

fnl_q.size: 1204
proc_q.size: 21

ss[0].shape (16, 30, 30, 1)
 ss[1]:
[[ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]]

fnl_q.size: 1204
proc_q.size: 5

ok  

All as expected

  • batch of pairs [sample label] are created
  • pairs are shuffled

Only thing left is to apply TF as it is intended to be used by replacing some_op :)

And a question: one observed problem problem - in case I use tf.FIFOQueue for file-names and tf.RandomShuffleQueue for samples - shuffling doesn't happen. However other way around (as it code above) it does shuffle perfectly.

Any problem with shuffling for
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s]) ?


ADD: The version with two threads:

  • one for re-fill/update/change file name queue
  • second for fill samples to processing queue.

Also added correct way to stop threads.

def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    try:
        while not coord.should_stop():
            feature, label = sess.run([feature_read_op, label_read_op ])
            feed_dict = {feature_input: feature,
                         label_input  : label}
            sess.run(enqueue_op, feed_dict=feed_dict)
    except Exception as e:
        return


# periodically check the state of fnl queue and if needed refill it
#  - enqueue_op: 'refill' file-name_label queue 
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
    try:
        while not coord.should_stop():
            time.sleep(0.5)
            s = sess.run(fnl_q.size())
            if  s < (9*BATCH_SIZE) :
                sess.run(enqueue_op)
    except Exception as e:
        return


#  -- ops for feed part --

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

# read op
fnl_q      = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = 'fnl_q')  # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])

# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=LAB_SIZE, name='label_input')
proc_q        = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = 'fe_la_q') 
enqueue_op    = proc_q.enqueue([feature_input, label_input])

# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)

... here is your model

loss       = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer  = tf.train.AdamOptimizer(1e-4).minimize(loss)

with tf.Session() as sess:

    coord = tf.train.Coordinator()
    t_le  = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = 'load_and_enqueue')
    t_re  = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = 'enqueue_fnl')  # re-enq thread i.e. refiling filename queue 
    t_le.start()
    t_re.start()

    try:
    # training
    for step in xrange(823):
        # some proc
        img_v, lab_v = sess.run([img_batch, lab_batch])
        feed_dict = { img_ph   : img_v,
              lab_ph   : lab_v,
              keep_prob: 0.7}
        _, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)

    except Exception as e:
    print 'Training: Exception:', e


    # stop threads 
    coord.request_stop()                                     # ask to stop
    sess.run(fnl_q.close(cancel_pending_enqueues=True))      # tell proc_q don't wait for enque anymore
    sess.run(proc_q.close(cancel_pending_enqueues=True))     # tell proc_q don't wait for enque anymore
    coord.join([t_le, t_re], stop_grace_period_secs=8)       
Community
  • 1
  • 1
rgr
  • 543
  • 2
  • 6
  • 13
  • 2
    This tutorial deserves a lot more attention! Have been looking for something like that for a long time. – mackcmillion Feb 13 '16 at 19:44
  • 2
    Just one comment - one need to take care that all queues are feed properly. As easiest way to stuck is to have blocking dequeue op waiting for queue to be filled by other thread. I ended up running one more thread to re-fill **fnl_q** – rgr Feb 14 '16 at 20:44
  • 1
    This does look interesting, thanks! We welcome pull-requests for documentation, so if you'd like to move this how-to into https://github.com/tensorflow/tensorflow/tree/master/tensorflow/g3doc/how_tos/ and put the code into tensorflow/examples, we could review it. – Pete Warden Jun 07 '16 at 01:09
  • 2
    @PeteWarden this method was very helpful for me back then. It is also evolved a bit in my usage. There are two threads as i said in prev comment. Also stopping the things requires a bit of care. I think i need to put updated version here below and then it is ok to move to how-to. I will do it in few days. – rgr Jun 08 '16 at 18:24
  • 1
    Thanks @rgr, that's great to hear! – Pete Warden Jun 13 '16 at 23:15
  • @rgr Thanks for posting this excellent example! My question is that could you possibly set the capacity in your queue to be `9*BATCHSIZE` and then drop the periodical checks. I think the queue won't be filled if it's full. Is there some reason against not doing this? – Pekka Sep 02 '16 at 14:58
  • @Pekka, you are right here: en-queue op will just hang till there is a space in the queue to put data. back then this slip away from me.. – rgr Sep 03 '16 at 20:18
  • You should add this to the new documentation section as an example! – Engineero Jun 08 '17 at 23:32

0 Answers0