0

I searched a correct answer to my problem during a long time (many hours) without result, so here I'm. I think I'm missing something obvious, but I can't know what...

problem : using queue for read a CSV file and train a Estimator with the input_fn without reload Graph everytime (which is very slow).


I create a custom model which give me a model_fn function for create my own estimator:

tf.estimator.Estimator(model_fn=model_fn, params=model_params)

After that, I need to read a very large CSV file (can't be load in memory), so I decided to use Queue (seems to be the best solution):

nb_features = 10
queue = tf.train.string_input_producer(["test.csv"],
                                       shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(queue)

record_defaults = [[0] for _ in range(nb_features+1)]
cols = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack(cols[0:len(cols)-1]) # Take all columns without the last
label = tf.stack(cols[len(cols)-1]) # Take last column

I think this code is ok.


Then, the main code:

with tf.Session() as sess:
    tf.logging.set_verbosity(tf.logging.INFO)
    sess.run(tf.global_variables_initializer())

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # Return a Tensor of 1000 features/labels
    def get_inputs():
        print("input call !")
        xs = []
        ys = []
        for i in range(1000):
            x, y = sess.run([features, label])
            xs.append(x)
            ys.append(y)
        return tf.constant(np.asarray(xs), dtype=tf.float32), tf.constant(np.asarray(ys))

    estimator.train(input_fn=get_inputs,
                   steps=100)

    coord.request_stop()
    coord.join(threads)

As you can see, there is a lot of ugly things here...

What I want : I want the train function to use a new batch of features at each steps. But here, it use the same batch of 1000 features during the 100 steps because the get_inputs function is just call when we start the training. Is there a easy way to do this ?

I try to loop the estimator.train with step=1, but this reload the graph everytime and become very slow.

I don't know what to do now and don't know if it's even possible..

Thanks for helping me !

Magoo
  • 77,302
  • 8
  • 62
  • 84
Kayoku
  • 3
  • 5

2 Answers2

1

Short version: convert your CSV file to a tfrecords and then use tf.contrib.data.TFRecordDataset. Long version: see code See the question/accepted answer here (copied below for convenience).


Check out the tf.contrib.data.Dataset API. I suspect you'll be best off converting your CSVs to TfRecord files and using TfRecordDataset. There's a thorough tutorial here.

Step 1: Convert to csv data to tfrecords data. Example code below.

import tensorflow as tf


def read_csv(filename):
    with open(filename, 'r') as f:
        out = [line.rstrip().split(',') for line in f.readlines()]
    return out


csv = read_csv('data.csv')
with tf.python_io.TFRecordWriter("data.tfrecords") as writer:
    for row in csv:
        features, label = row[:-1], row[-1]
        features = [float(f) for f in features]
        label = int(label)
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

This assumes the labels are integers in the last column with float features in the preceding columns. This only needs to be run once.

Step 2: Write a dataset the decodes these record files.

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']


def input_fn():
    dataset = tf.contrib.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
    print(dataset.output_shapes)
    features, label = dataset.make_one_shot_iterator().get_next()
    return features, label

To test (independent of the estimator):

batch_size = 4
shuffle_size = 10000
features, labels = input_fn()
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)

For use with tf.estimator.Estimator:

estimator.train(input_fn, max_steps=1e7)
DomJack
  • 4,098
  • 1
  • 17
  • 32
  • Hi, thanks for your answer. I already saw your post, but this solution need to convert CSV to TFRecord, I'm wondering if we can't avoid doing this with just using Queue & Batch function. – Kayoku Aug 28 '17 at 13:53
  • You could... but I'm fairly sure that's all the dataset is doing under the hood. You could read straight from the csv and parse the string data as you train, but it'll be slower. The only downside I understand of tfrecords is they take up extra space. Is that really a concern? If you're loading images, you can just save the path in the tfrecords and then read from file as normal. – DomJack Aug 28 '17 at 13:57
  • No, it's not really important to duplicate in TFrecord, it's just for my personal knowledge (and because I waste a lot of time on this question haha). Do you have any idea to do it from CSV without conversion ? And with your solution, all the dataset is not loaded in memory ? Just the current batch, right ? I'll try this tomorrow – Kayoku Aug 28 '17 at 14:12
  • Theres definitely a way, but I highly doubt it'll be the best way. I spent a long time trying to make a data pipeline that worked the way I wanted it to before I realised there are smarter people in bigger teams doing a better job and considering edge cases with more versatility :). Besides, there are more fun problems than data io – DomJack Aug 28 '17 at 14:17
  • Ok ok, you win. I'm pretty tired of this too. Thanks again :) – Kayoku Aug 28 '17 at 14:22
  • Hi, I was looking for a CSV pure solution (yeah, I know, It's boring but I'm obstinate boy), and I'm pretty near to find it, but tf.train.batch seems to block for no reason, if you have any idea DomJack, I post the code below. Thanks! – Kayoku Aug 29 '17 at 12:32
0

If you're concerned about the tf.train.start_queue_runners not being called, try the following:

class ThreadStartHook(tf.train.SessionRunHook):
    def after_create_session(self, session, coord):
        self.coord = coord
        self.threads = tf.train.start_queue_runners(coord=coord, sess=session)

    def end(self, session):
        self.coord.request_stop()
        self.coord.join(self.threads)


estimator.train(input_fn, [ThreadStartHook()])

I had similar thoughts when I started, but found it wasn't necessary.

DomJack
  • 4,098
  • 1
  • 17
  • 32
  • Ok! I think this code solve the problem. But another appear... (else it's not fun yeah). "Passed Tensor("sparse_softmax_cross_entropy_loss/value:0", shape=(), dtype=float32) should have graph attribute that is equal to current graph". I'll investigate on this bug now. – Kayoku Aug 30 '17 at 08:03
  • Ok I understand better... In fact, we need to call "start_queue_runners" AFTER the tf.train.batch, the intern queue will not start! (tested and approuved) – Kayoku Aug 30 '17 at 08:20