7

I have upgraded to the latest apache_beam[gcp] package via pip install --upgrade apache_beam[gcp]. However, I noticed that Reshuffle() does not appear in the [gcp] distribution. Does this mean that I will not be able to use Reshuffle() in any dataflow pipelines? Is there any way around this? Or is it possible that the pip package is just not up to date and if Reshuffle() is in master on github then it will be available on dataflow?

Based on the response to this question I am trying to read data from BigQuery and then randomize the data before I write it to CSV's in a GCP storage bucket. I have noticed that my sharded .csv's that I am using to train my GCMLE model are not truly random. Within tensorflow I can randomize the batches, but that will only randomize the rows within each file that is built up in the queue and my issue is that currently the files being generated are biased in some way. If there are any suggestions for other ways to shuffle right before writing to CSV in dataflow that would be much appreciated.

reese0106
  • 2,011
  • 2
  • 16
  • 46

1 Answers1

4

One approach is to recreate shuffle myself.

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

My remaining question would be if I need to worry about the windowing or ExpandIterable sections from the code

reese0106
  • 2,011
  • 2
  • 16
  • 46
  • The second one has to be a FlatMap (which is what ExpandIterable effectively does). Windowing stuff is important if you use a custom trigger. – jkff Feb 02 '18 at 22:07
  • Thanks! I was trying to figure out how to solve the issue `'_UnwindowedValues' object does not support indexing` and I'm assuming that is what your saying. What would be an example of a custom trigger? – reese0106 Feb 02 '18 at 23:50
  • I'm referring to https://beam.apache.org/documentation/programming-guide/#triggers . I forgot that currently only Beam Java supports custom triggers. Then you don't need to worry about triggers at all - simply changing Map to FlatMap in your code should be enough. – jkff Feb 03 '18 at 01:10
  • There are [Pre-Written Transforms in the Cloud Dataflow SDK](https://cloud.google.com/dataflow/model/library-transforms#mapshufflereduce-style-processing). The page includes a sub-chapter on "Map/Shuffle/Reduce-Style Processing", where you may find useful code snippets. A more comprehensive solution with complete code is to be read in reply to the [How to reshuffle a PCollection?](https://stackoverflow.com/questions/40767189/how-to-reshuffle-a-pcollectiont) question. – George Feb 03 '18 at 01:40