This is how-to which I believe is missed from TF examples.
Task:
- samples for each class are given in separate dir and thus labels are indirect (i.e. by dir)
- decoupled load and computations in TF
Each separate bit could be found, however I think have them all together in one place will help to save a lot of time for TF beginners (like myself).
Lets tackle 1. in my case it is two sets of images:
# all filenames for .jpg in dir
# - list of fnames
# - list of labels
def path_fnames(f_path, label, ext = ['.jpg', '.jpeg']):
f_n = [f_path+'/'+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
f_l = [label] * len(f_n)
return f_n, f_l
#
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
data_dir = '/mnt/dataset/'
dir_1 = '/class_1'
dir_2 = '/class_2'
# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]
f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)
# --- create one-hot labels ---
ohl = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2; # one-hot labels created in this sequence
Now we have all file-names and one-hot labels preloaded.
Lets move to the 2.
It is based on How to prefetch data using a custom python function in tensorflow. In short it has:
- custom image-reader (replace with yours)
- queue fnl_q with [filename label] which is used by reader to feed
- queue proc_q with [sample label] which is used to feed processing some_op
- thread which perform read_op to get [sample label] and enqueue_op to put pair into proc_q. Thread is controlled by tf.Coordinator
- some_op which first get data from proc_q by dequeue_many() and rest of computation (also could be put in separate thread).
Notes:
- feature_read_op and label_read_op are two separate ops.
- I use sleep() to slow down and control op - only for test purposes
- i have separated "feeding" and "calculation" parts - in real case just run them in parallel
print 'TF version:', tf.__version__
# --- params ----
im_s = [30, 30, 1] # target image size
BATCH_SIZE = 16
# image reader
# - fnl_queue: queue with [fn l] pairs
# Notes
# - to resize: image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
fn, label = fnl_queue.dequeue()
if keep:
fnl_queue.enqueue([fn, label])
img_bytes = tf.read_file(fn)
img_u8 = tf.image.decode_jpeg(img_bytes, channels=ch)
img_f32 = tf.cast(img_u8, tf.float32)/256.0
#img_4 = tf.expand_dims(img_f32,0)
return img_f32, label
# load [feature, label] and enqueue to processing queue
# - sess: tf session
# - sess: tf Coordinator
# - [fr_op, lr_op ]: feature_read_op label_read_op
# - enqueue_op: [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
i = 0
while not coord.should_stop():
# for testing purpose
time.sleep(0.1)
#print 'load_and_enqueue i=',i
#i = i +1
feature, label = sess.run([feature_read_op, label_read_op ])
feed_dict = {feature_input: feature,
label_input : label}
sess.run(enqueue_op, feed_dict=feed_dict)
# --- TF part ---
# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)
#fnl_q = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])
# reading_op: feature_read_op label_read_op
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])
# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name='feature_input')
label_input = tf.placeholder(tf.float32, shape=l_s, name='label_input')
#proc_q = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])
# test:
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op = [img_batch, lab_batch]
# service ops
init_op = tf.initialize_all_variables()
# let run stuff
with tf.Session() as sess:
sess.run(init_op)
sess.run(do_enq)
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
# --- test thread stuff ---
# - fill proc_q
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
t.start()
time.sleep(2.1)
coord.request_stop()
coord.join([t])
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
# - process a bit
ss = sess.run(some_op)
print 'ss[0].shape', ss[0].shape
print ' ss[1]:\n', ss[1]
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
print 'ok'
Typical output:
TF version: 0.6.0
fnl_q.size: 1225
proc_q.size: 0
fnl_q.size: 1204
proc_q.size: 21
ss[0].shape (16, 30, 30, 1)
ss[1]:
[[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 1. 0.]
[ 0. 1.]
[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 1. 0.]
[ 0. 1.]]
fnl_q.size: 1204
proc_q.size: 5
ok
All as expected
- batch of pairs [sample label] are created
- pairs are shuffled
Only thing left is to apply TF as it is intended to be used by replacing some_op :)
And a question:
one observed problem problem - in case I use tf.FIFOQueue
for file-names and tf.RandomShuffleQueue
for samples - shuffling doesn't happen. However other way around (as it code above) it does shuffle perfectly.
Any problem with shuffling for
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
?
ADD: The version with two threads:
- one for re-fill/update/change file name queue
- second for fill samples to processing queue.
Also added correct way to stop threads.
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
try:
while not coord.should_stop():
feature, label = sess.run([feature_read_op, label_read_op ])
feed_dict = {feature_input: feature,
label_input : label}
sess.run(enqueue_op, feed_dict=feed_dict)
except Exception as e:
return
# periodically check the state of fnl queue and if needed refill it
# - enqueue_op: 'refill' file-name_label queue
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
try:
while not coord.should_stop():
time.sleep(0.5)
s = sess.run(fnl_q.size())
if s < (9*BATCH_SIZE) :
sess.run(enqueue_op)
except Exception as e:
return
# -- ops for feed part --
# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)
# read op
fnl_q = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = 'fnl_q') # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])
# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name='feature_input')
label_input = tf.placeholder(tf.float32, shape=LAB_SIZE, name='label_input')
proc_q = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = 'fe_la_q')
enqueue_op = proc_q.enqueue([feature_input, label_input])
# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
... here is your model
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer = tf.train.AdamOptimizer(1e-4).minimize(loss)
with tf.Session() as sess:
coord = tf.train.Coordinator()
t_le = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = 'load_and_enqueue')
t_re = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = 'enqueue_fnl') # re-enq thread i.e. refiling filename queue
t_le.start()
t_re.start()
try:
# training
for step in xrange(823):
# some proc
img_v, lab_v = sess.run([img_batch, lab_batch])
feed_dict = { img_ph : img_v,
lab_ph : lab_v,
keep_prob: 0.7}
_, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)
except Exception as e:
print 'Training: Exception:', e
# stop threads
coord.request_stop() # ask to stop
sess.run(fnl_q.close(cancel_pending_enqueues=True)) # tell proc_q don't wait for enque anymore
sess.run(proc_q.close(cancel_pending_enqueues=True)) # tell proc_q don't wait for enque anymore
coord.join([t_le, t_re], stop_grace_period_secs=8)