2

I'am training a linear regression problem using tf.train.GradientDescentOptimizer() in Tensorflow. In general, I can use placeholders and feed_dict={} to input a batch of samples everytime and train the weight W. However, I would like to use tf.FIFOQueue instead of feed_dict. For example, in the following code, I input X and Y and train weight W:

v_dimen = 300
n_samples = 10000
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1) 
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1) 

q_in = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
enqueue_op = q_in.enqueue(X)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_in, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
X_batch = q_in.dequeue()    

q_out = tf.FIFOQueue(capacity=5, dtypes=tf.float32)  # enqueue 5 batches
enqueue_op = q_out.enqueue(Y)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_out, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
Y_batch = q_out.dequeue() 

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0,stddev=0.001))
predicted_Y = f(X_batch) # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(10000):
        sess.run([optimizer]) # would like to run on mini batches

    coord.request_stop()
    coord.join(threads)

I would like to know how to change the code to be able to train W using X_batch and Y_batch in mini-batches of size batch_size.

Mila
  • 285
  • 4
  • 13
  • Your code works, I don't see any problems. But for importing data, better using tf.data to do it. The documentation is here, https://www.tensorflow.org/guide/datasets?hl=en – LI Xuhong Jan 20 '19 at 10:37
  • Thanks @LIXuhong for the reply. But in my code, I do not specify any batch size, so I guess it only reads one sample from queue per time. I think I need to use something like `q_in.dequeue_many(batch_size)`. – Mila Jan 21 '19 at 12:11
  • @LIXuhong Ok, then you mean I can create my batches using tf.data. – Mila Jan 21 '19 at 12:24
  • sorry, I thought you did use `batch_size`... see my answer. – LI Xuhong Jan 21 '19 at 14:50

1 Answers1

2

The code using tf.data (with comments):

import tensorflow as tf

v_dimen = 300
n_samples = 10000
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1)
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1)

# X and Y are fixed once having created.
dataset = tf.data.Dataset.from_tensor_slices((X, Y))
# dataset = dataset.shuffle(n_samples)  # shuffle
dataset = dataset.repeat()  # will raise OutOfRangeError if not repeat
dataset = dataset.batch(batch_size)  # specify batch_size
iterator = dataset.make_initializable_iterator()
X_batch, Y_batch = iterator.get_next()  # like dequeue.

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0, stddev=0.001))
predicted_Y = tf.matmul(X_batch, W)  # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = [tf.global_variables_initializer(), iterator.initializer]  # iterator.initializer should be initialized.

with tf.Session() as sess:
    sess.run(init)
    for i in range(1000):
        _, x, y = sess.run([optimizer, X_batch, Y_batch])
        print(i, x.shape, y.shape, y[0])  # y[0] will be repeated after 10000 / 32 = 625 iterations. 

If you would like to use queue, which will be deprecated, then see the code below (with comments)

import tensorflow as tf

v_dimen = 300
n_samples = 100  # you don't enqueue too many elements each time.
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1) 
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1) 
# each time X and Y will be re-created when being demanded to enqueue.

# The capacity of queue is not the same as the batch size, it is just for the queue. 
# It is the upper bound on the number of elements that may be stored in this queue.
# When you want to use `dequeue_many`, which allows to specify the batch size, the `shapes` is also important.
# Because `dequeue_many` slices each component tensor along the 0th dimension to make multiple elements as output. 
# For the same reason, `enqueue_many` should be used.
# see more in the documentation of `FIFOQueue`, `enqueue_many` and `dequeue_many`.
q_in = tf.FIFOQueue(capacity=50, dtypes=tf.float32, shapes=[v_dimen]) 
enqueue_op = q_in.enqueue_many(X)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_in, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
X_batch = q_in.dequeue_many(batch_size)

q_out = tf.FIFOQueue(capacity=50, dtypes=tf.float32, shapes=[1])
enqueue_op = q_out.enqueue_many(Y)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_out, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
Y_batch = q_out.dequeue_many(batch_size)

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0,stddev=0.001))
predicted_Y = tf.matmul(X_batch,W) # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(1000):
        sess.run([optimizer])

    coord.request_stop()
    coord.join(threads)
LI Xuhong
  • 2,339
  • 2
  • 17
  • 32