10

With Horovod, you basically run N independent instances (so it is a form of between-graph replication), and they communicate via special Horovod ops (basically broadcast + reduce).

Now let's say either instance 0, or some other external instance loads your data (via tf.data.Dataset). How would you distribute the iterator.get_next() to each instance? Using Horovod broadcast would be inefficient, as you would copy all the data to all instances.

Having the dataset in every instance, and doing all the loading in there, and then using shard on the dataset would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it. So that's why you would also not want sharding, and instead have the dataset loading only in a single (producer/dataset worker) instance, which then distributes the batches on all the train workers.

I guess the TF MultiDeviceIterator provides some similar functionality (or basically exactly that) but I'm not sure whether that works together with Horovod, and how you would set it up?

Or maybe you can make the distribution somehow via TF workers (guide)? (Maybe that is how you would configure MultiDeviceIterator as well?)

If possible, this should be via TensorFlow operations / functions (there are many related functions which might already give me this, but I might not know about them, or have misunderstood how it works). Or maybe the answer is that TensorFlow does not provide any such functionality yet? (This would still be useful to know. Then I would implement my own solution in C++, wrapped as a TensorFlow op. But before doing so, it would be good to know whether this is really necessary.)

(Related is also this Horovod issue.)

(This question is actually a bit more generic than just Horovod, although Horovod might be a good example. You might have this problem always for distributed TensorFlow?)

(I collected an overview of all the distributed TensorFlow terminology and aspects here, mostly for clarification.)

(Related are (maybe?) also this, this, this, this or this questions.)

Albert
  • 65,406
  • 61
  • 242
  • 386
  • I don't think that is possible with Horovod, and most definitely not with plain TensorFlow. What you would need is a [MPI scatter operation](https://mpitutorial.com/tutorials/mpi-scatter-gather-and-allgather/#an-introduction-to-mpi_scatter), but Horovod only implements all-reduce, all-gather and broadcast semantics. If you store your data as TFRecord files, your best bet might be to [split them into many files](https://stackoverflow.com/q/54519309/1782792) and read a shard from each instance, as suggested in the answer... – jdehesa Jun 03 '20 at 10:44
  • @jdehesa As you mention this, I added a [feature request about adding MPI scatter support for Horovod](https://github.com/horovod/horovod/issues/2007). – Albert Jun 03 '20 at 13:38
  • [TensorFlow 2.3.0](https://github.com/tensorflow/tensorflow/releases/tag/v2.3.0-rc0) is introducing a [`tf.data` service](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/distribute) for distributed training. I'm not sure if it solves your problem in particular but might be worth for you to keep an eye on it. – jdehesa Jun 29 '20 at 10:29
  • @jdehesa Oh, thanks. That looks exactly like what I'm looking for, when you use a shared job (setting `job_name`). Or at least almost, because it will not be distributed evenly, but on a first-come first-served basis (which is maybe ok). Interestingly, when the job is not shared, this is another solution I currently implemented already, by having each worker just using a different random seed for the dataset shuffling. – Albert Jun 29 '20 at 12:40
  • Albert, take a look at fanstore. If you are interested, then I can post an answer how this works. https://github.com/TACC/FanStore – denfromufa Sep 15 '20 at 01:28

2 Answers2

6

As you said, copying the data in each instance and sharding the data for each instance would be impractical.

One solution would then be to separate the data in a data process and have each instance pull data from the data process as shown in the figure below. For example, this communication can be established using a queue.

In such a system, the data process would load the dataset, preprocess it into batches and push the batches into a queue. Each training instance would then pull batches from this queue. For example, you could pass the queue as a generator into the dataset API (see tf.data.Dataset.from_generator). Also, if batches are not produced fast enough, it is possible to create more data processes to increase the batches throughput.

Depending on your use case, the implementation specifics will vary. For more information, you can look up Networking and Interprocess communication and Multiprocessing pipes and queues.

                                                             Training        
                                                         +--------------+  ++
                                                         |              |   |
                                                    +----+  Instance 1  |   |
                                                    |    |              |   |
                                                    |    +--------------+   |
                                                    |                       |
                      Preprocessing                 |                       |
                  +--------------------+            +---->      X           |
                  |                    |            |                       |
             Load |                    | Batches    +           X           |
    Dataset+------>    Data Process    +--------->Queue                     |  N instances
                  |                    |            +           X           |  Distributed training
                  |                    |            |                       |  For example, using
                  +--------------------+            +---->      X           |  Horovod broadcast + reduce
                                                    |                       |
                                                    |        Training       |
                                                    |    +--------------+   |
                                                    |    |              |   |
                                                    +----+  Instance N  |   |
                                                         |              |   |
                                                         +--------------+  ++

For a tensorflow implementation, you could use tf.data.Dataset.shard with tf.data.TFRecordDataset.

The documentation addresses your inefficiency concern using TFRecords:

Important caveats:

  • Be sure to shard before you use any randomizing operator (such as shuffle).

  • Generally it is best if the shard operator is used early in the dataset pipeline. For example, when reading from a set of TFRecord files, shard before converting the dataset to input samples. This avoids reading every file on every worker. The following is an example of an efficient sharding strategy within a complete pipeline:

d = Dataset.list_files(pattern)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
Maurice Qch
  • 166
  • 1
  • 4
  • Yes, this is basically what I want. But my question is rather: How to implement it (using TensorFlow). I already have the very initial dataset as a `tf.data.Dataset` (in your figure the very left dataset). The preprocessing part is quite simple in TF. But how would you organize this? I guess this would run on a dedicated TF worker (via distributed TF). But **how to implement the queue (in TF)**? This is my main question here. I think the Python functions are not so useful here. If you say this does not exist in TF, I would rather implement this part in C++. – Albert May 29 '20 at 10:15
  • @Albert Thanks for your comment. I have updated the answer with more details on how to address this problem using TensorFlow. – Maurice Qch May 30 '20 at 12:43
  • But this code is exactly how I don't want to do it (using sharding). I want to have a single dataset pipeline and distribute it across all train workers (like I described, like you have it in your picture). – Albert May 30 '20 at 13:19
  • @Albert You mentioned not wanting to do sharding because it "would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it". In the proposed solution, only the necessary data is loaded for each instance and nothing is thrown out. The `shard` solution is indeed different in terms of implementation but it should be equivalent in terms of results for uniform sampling. – Maurice Qch May 31 '20 at 19:36
  • Are you sure? So `shard` on a `TFRecordDataset` is specially implemented? Where can I see this? But anyway, my question is specifically about not doing it that way. I want to know how I can do what I asked for (and what you also outlined in your picture). – Albert May 31 '20 at 20:22
  • Yes, the documentation of `shard` specifies "when reading from a set of TFRecord files, shard before converting the dataset to input samples. This avoids reading every file on every worker.". To my knowledge, tensorflow does not provide such a solution as `shard` provides the same results. For the exact implementation, it depends on your use case (one computer vs multiple computers, network topology, storage infrastructure etc.). As explained in the answer, the implementation can be done using processes and interprocess communication. It can be done in whichever language you prefer. – Maurice Qch Jun 01 '20 at 09:22
  • To be more precise, `shard` "loads the data everywhere, and then throw away (N-1)/N of it" only if you load the entire dataset samples and then use `shard`. In the case of a `TFRecordDataset`, it does not happen if you use `shard` right after initialization. – Maurice Qch Jun 01 '20 at 09:59
  • I think you refer to when you have multiple input files. In your example, this is not the case. But anyway, this is not how I want to do it (using `shard`). I want to do it like I described in my question, and like you have it in your picture. I want to do it in TensorFlow, and this is what my question is about: How to do exactly that in TensorFlow (not any other solution like IPC in Python). – Albert Jun 01 '20 at 10:59
  • @Albert You are right, I inserted the wrong code sample. It is now fixed. Unfortunately, I do not know of any other ways to achieve what you want. – Maurice Qch Jun 01 '20 at 14:28
1

I would reccomend taking a looking at YogaDL. It allows you to cache your dataset such that during training (or re-training) you will only access the data you need on that shard, rather than throwing away (N-1)/N your data reads.

Aaron
  • 225
  • 1
  • 3