1

I have succesfully implemented a dataflow pipeline that writes to BigQuery. This pipeline is transforming data for a Cloud ML Engine job. However, I noticed that the rows that have been written are ordered (or at least grouped) by the labels of my data. By this, I mean that they visually appear to be organized in some way (that is not completely random). Then when I export the table to sharded .csv's in GCS, each sharded .csv is essentially ordered. This means that the data cannot be entered into TensorFlow randomly since TF grabs one .csv at a time and the .csv's themselves are not random bags or rows.

Can anybody explain why the BigQuery table written by the apache beam pipeline would appear to be non-random if the original input data was randomized? Is there any way to force a shuffle/randomization of rows before writing to BigQuery? I need to ensure that the training data is completely random before being loaded into the ML model.

reese0106
  • 2,011
  • 2
  • 16
  • 46

1 Answers1

2

BigQuery tables don't have the concept of order or grouping, they are just a bag of rows; if one needs ordering or grouping, one writes a query with an ORDER BY or GROUP BY clause. If you have code that reads rows from BigQuery and requires these rows to be read in random order, you can do something like https://www.oreilly.com/learning/repeatable-sampling-of-data-sets-in-bigquery-for-machine-learning

jkff
  • 17,623
  • 5
  • 53
  • 85
  • I updated my question to try to be more specific. When I look at the first 50 rows of the BigQuery table, I can visually see that the rows are no longer random. So, when I export this data to GCS, the sharded .csv's are also no longer random which creates problems in my input data. The repeated sub-sampling you mentioned does not help me because I need to first export sharded .csv's to GCS and am not querying the tables directly. Could you help me to understand why the BigQuery rows created by the dataflow pipeline would appear non-random if it is just a bag of rows? – reese0106 Oct 16 '17 at 22:37
  • 1
    "Bag of rows" means that the order of rows in a BigQuery table is 1) unspecified and 2) non-deterministic - i.e. BigQuery is allowed to return rows in any order, and in different order every time you ask. It does guarantee that the order will be "random-looking" in any sense. If you need randomness, you need to introduce it yourself, e.g. by doing ORDER BY HASH(something). – jkff Oct 16 '17 at 22:52
  • There can be many reasons why rows appeared non-random when you queried. Maybe they are physically, indeed, stored in BigQuery's storage engine in some way that, due to internal implementation details of BigQuery storage and query engine and possibly Dataflow, partially coincided with some amount of ordering in the input data; maybe BigQuery, as an optimization at the storage layer, chose to pre-sort the data in some way or insert it into a sorted index (I don't know whether BQ does anything like this, but many other databases definitely would); could be something else. – jkff Oct 16 '17 at 22:55
  • Thanks! That makes sense to me. It seems like my only option would be to manually create a separate table within BigQuery that has randomness introduced. So, within Apache-Beam the best option I can think of is to 1) WriteToBigQuery() [this table has order] and then I would need to have another step in the pipeline to read from bigquery in a random order and then write sharded .csv's to GCS from that randomized order. Can you think of any better way to introduce the randomness to the sharded .csv's? – reese0106 Oct 16 '17 at 23:44
  • 1
    BigQuery tables don't have order (moreover BQ doesn't even allow ORDER BY on large query results https://cloud.google.com/bigquery/docs/writing-results#large-results); nothing you do with Dataflow or with your BigQuery export can change that. There is no way to guarantee *anything* about the order of output of bigquery export, except that it will contain all rows. It sounds like what you really want is take the set of CSV files produced by BigQuery and produce an ordered sequence of CSV files that contains the same data but is more random-looking, is that correct? How large is the data? – jkff Oct 17 '17 at 00:06
  • Yes that sounds like it would work. The total size is around 30 GB of data – reese0106 Oct 17 '17 at 00:56
  • Also, if BQ tables don't have order, then how come when I repeat the pipeline the first 100 rows end up essentially identical? My data has 100+ "labels" and the first 100 rows that I am looking at end up with the same label, whereas that same label only accounts for ~0.1% of the total dataset so it is pretty close to impossible for this data to end up next to each other by complete chance.So, this does seem like there is some sort of pre-sorting that is being done by BQ (or Dataflow) as you suggested that is resulting in a deterministic order of the data – reese0106 Oct 17 '17 at 01:10
  • Many implementation details of BigQuery, Dataflow, the way that Dataflow ingested the data into BigQuery, or the way BigQuery retrieved data, could have caused BigQuery to store the data in such a way that upon retrieval it was not random-looking. "Tables don't have order" means "the database is free to return the rows in whatever order it wants and you can't rely on any particular order or even on the order being consistent", rather than "retrieval will be random-looking". This is true of most SQL databases I know, not just BigQuery. – jkff Oct 18 '17 at 19:09
  • 1
    To your main point: try BigQueryIO.read() + Reshuffle.viaRandomKey() + TextIO.write()? This also technically won't give you any hard guarantees of "random-lookingness", but in practice most likely will at least distribute data randomly between different generated files. – jkff Oct 18 '17 at 19:10
  • I am revisiting this and trying to implement in the python sdk -- it appears that Reshuffle appears here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L516 and also seems to automatically add a random key, shuffle on it -- is there anything I would be missing here that is offered in Reshuffle.viaRandomKey()? – reese0106 Feb 02 '18 at 14:54
  • also, I have upgraded to the latest python sdk of apache_beam[gcp] and it doesn't appear to include Reshuffle under `apache_beam.transforms.util` am I missing something? – reese0106 Feb 02 '18 at 15:08
  • Ah, you're using Beam Python SDK - I don't think it has this transform yet, but you can take a look at how it's implemented in the Java SDK and do something similar yourself. It just pairs every element with a randomly generated key and then groups by that key and ungroups back. – jkff Feb 02 '18 at 18:25
  • Yes, I'm looking at it now (https://stackoverflow.com/questions/48585959/apache-beam-transforms-util-reshuffle-not-available-for-gcp-dataflow) and trying to recreate it. Are all the windowing stuff needed? Is there anything beyond just GroupByKey() with the random number attached? https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L497 – reese0106 Feb 02 '18 at 18:47
  • @jfkk posted a potential solution here based on your comments -- could you take a look and let me know if it is sufficient? https://stackoverflow.com/questions/48585959/apache-beam-transforms-util-reshuffle-not-available-for-gcp-dataflow/48589126#48589126 – reese0106 Feb 02 '18 at 18:52