2

Short question: I would like to split a BQ table into multiple small tables, based on the distinct values of a column. So, if column country has 10 distinct values, it should split the table into 10 individual tables, with each having respective country data. Best, if done from within a BQ query (using INSERT, MERGE, etc.).

What I am doing right now is importing data to gstorage -> local storage -> doing splits locally and then pushing into tables (which is kind of a very time consuming process).

Thanks.

khan
  • 7,005
  • 15
  • 48
  • 70
  • btw, I tried https://stackoverflow.com/questions/42031135/split-table-into-multiple-tables-based-on-date-using-bigquery-with-a-single-quer ..but my data is little bigger for this approach and big query maxes out and gives an error. – khan Nov 01 '18 at 20:28
  • I think you look into partitioning your table with BigQuery (assuming it supports that). – Tim Biegeleisen Nov 01 '18 at 20:29
  • It just supports partitioning by `date`. – khan Nov 01 '18 at 20:30
  • few questions: how large size of the row can be and how many distinct values you expect? – Mikhail Berlyant Nov 01 '18 at 20:31
  • On average I think my table's rows can go up to ~ 50KB max (where most are less than that) and there are multiple millions of rows. Each row has 20 columns of which `country` is one. – khan Nov 01 '18 at 20:34
  • got it. is it really country field (so you would have about 200+ distinct values) or you just using country as an alias - in this case how many distinct values you expect in that field? – Mikhail Berlyant Nov 01 '18 at 20:38
  • Maybe max 500 distinct values in that column really, not more than that. – khan Nov 01 '18 at 20:49
  • check also https://stackoverflow.com/a/43130999/5221944 - it has some more details/insights - your numbers look still within limits of 10000 columns and 100MB per row - as I mentioned in some of my old posts - I was able to handle cases like yours - it is just matter of tuning/adjusting. You mentioned you tried and query gives error - can you provide more details on this? Btw, number of rows makes no difference - what is important is row size and number of distinct values to partition on. If column names are long enough - you should use short aliases to manage final size of the row ... – Mikhail Berlyant Nov 01 '18 at 21:24
  • another part that can be removed which potentially can cause resource issue is use of ROW_NUMBER() - it is used to compact the pivot table to pack data into as few rows as possible - at the bottom of that post you can find version without this feature thus allowing for more data to be processed – Mikhail Berlyant Nov 01 '18 at 21:29
  • okay..i think i underestimated my table. Here is the error when I try your approach: `Error: Cannot query rows larger than 100MB limit.` – khan Nov 02 '18 at 00:24

2 Answers2

1

If the data has the same schema, just leave it in one table and use the clustering feature: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#creating_a_clustered_table

#standardSQL
 CREATE TABLE mydataset.myclusteredtable
 PARTITION BY dateCol
 CLUSTER BY country
 OPTIONS (
   description="a table clustered by country"
 ) AS (
   SELECT ....
 )

https://cloud.google.com/bigquery/docs/clustered-tables

The feature is in beta though.

Martin Weitzmann
  • 4,430
  • 10
  • 19
  • Why not? See https://medium.com/google-cloud/bigquery-optimized-cluster-your-tables-65e2f684594b – Martin Weitzmann Jan 03 '19 at 07:28
  • I have different results. For example I am running this from the public dataset: ```create table `temp.sample_clustered_table` partition by date(pickup_datetime) cluster by rate_code, payment_type OPTIONS( require_partition_filter=true ) as select * from `nyc-tlc.green.trips_2015` ``` to create a clustered table. However, when I query `temp.sample_clustered_table` it only reduces the cost on filter over partition and doesn't reduce cost further if I filter by lets say `rate_code`. Not sure why..you can try.. – khan Jan 03 '19 at 18:35
  • Maybe you're not using prefix-filtering? If you use something like `rate_code LIKE '%xyz'` it won't work. It has to be prefix or exact. – Martin Weitzmann Jan 03 '19 at 18:57
  • no i am not using wildcard searches.. i am doing `select * from temp.sample_clustered_table where date(pickup_datetime) = '2015-03-01' and rate_code=5;` but it seems like whatever `rate_code` I change to, it doesn't help reducing the cost. Point to be noted, that it should change the cost as the `rate_code` is toggled in the query (because some `rate_code` occur very few times in the dataset) – khan Jan 03 '19 at 19:00
  • It doesn't change the estimated cost - only the resulting cost. BigQuery can't estimate costs taking clusters into account. Did you actually run the queries to compare costs? – Martin Weitzmann Jan 04 '19 at 07:17
1

You can use Dataflow for this. This answer gives an example of a pipeline that queries a BigQuery table, splits the rows based on a column and then outputs them to different PubSub topics (which could be different BigQuery tables instead):

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));

final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};

PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
        .withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
        .of(new DoFn<TableRow, String>() {
            @Override
            public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {

                if (c.element().getF().get(2).getV().equals("2010")) {
                    c.output(c.element().toString());
                } else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
                    c.sideOutput(readings2000plus, c.element().toString());
                } else {
                    c.sideOutput(readingsOld, c.element().toString());
                }

            }
        }));
collectionTuple.get(readings2010)
        .apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
        .apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
        .apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));

p.run();
medvedev1088
  • 3,645
  • 24
  • 42