0

Bigquery allow partitioning, only by date, at this time.

Lets supose I have a 1billion table rows with inserted_timestamp field. Lets supose this field has dates from 1 year ago.

What is the right way to move existing data to a new partitioned table?

Edited

I saw there was a elegant solution on Java with version < 2.0 Sharding BigQuery output tables also elaborated at BigQuery partitioning with Beam streams that is to parametrize table name ( or partition suffix ) windowing data.

But I miss BigQueryIO.Write on 2.x beam project also there is no samples about get window time from python serializable function.

I tried to make partitions on pipe but if fails with a large number of partitions ( runs with 100 but fails with 1000 ).

This is my code as far as I could:

               (  p
                | 'lectura' >> beam.io.ReadFromText(input_table)
                | 'noheaders' >> beam.Filter(lambda s: s[0].isdigit())
                | 'addtimestamp' >> beam.ParDo(AddTimestampDoFn())
                | 'window' >> beam.WindowInto(beam.window.FixedWindows(60))
                | 'table2row'  >> beam.Map( to_table_row )  
                | 'write2table' >> beam.io.Write(beam.io.BigQuerySink(
                        output_table,   #<-- unable to parametrize by window
                        dataset=my_dataset, 
                        project=project, 
                        schema='dia:DATE, classe:STRING, cp:STRING, import:FLOAT',
                        create_disposition=CREATE_IF_NEEDED,
                        write_disposition=WRITE_TRUNCATE,
                                    )
                                )
                )

p.run()
dani herrera
  • 48,760
  • 8
  • 117
  • 177
  • 2
    https://stackoverflow.com/questions/38993877/migrating-from-non-partitioned-to-partitioned-tables should have a couple approaches that are relevant. Also I think you should be able to use JSON or AVRO instead of CSV to avoid working with flat files. – Nhan Nguyen Oct 13 '17 at 17:47
  • @NhanNguyen, just edited my question to be more specific. Exists an elegant solution on <2.0 and I miss it on >2.x. Thanks about your link, I followed it and was very related issue. Thanks again. – dani herrera Oct 16 '17 at 10:04

1 Answers1

2

All of the functionality necessary to do this exists in Beam, although it may currently be limited to the Java SDK.

You would use BigQueryIO. Specifically, you may use DynamicDestinations to determine a destination table for each row.

From the example of DynamicDestinations:

events.apply(BigQueryIO.<UserEvent>write()
  .to(new DynamicDestinations<UserEvent, String>() {
        public String getDestination(ValueInSingleWindow<String> element) {
          return element.getValue().getUserId();
        }
        public TableDestination getTable(String user) {
          return new TableDestination(tableForUser(user), 
            "Table for user " + user);
        }
        public TableSchema getSchema(String user) {
          return tableSchemaForUser(user);
        }
      })
  .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() {
     public TableRow apply(UserEvent event) {
       return convertUserEventToTableRow(event);
     }
   }));
Ben Chambers
  • 6,070
  • 11
  • 16
  • 1
    Why they are not a python wrapper to do it? I should to afford dataflow projects with Java instead python? Do you know if Google is foucousing resources on Java? I mean, if I work in Python I will miss more functionalities than this one? Thanks! – dani herrera Oct 16 '17 at 21:04
  • As this demonstrates, there is differing functionality between the Java and Python SDKs. Addressing these gaps is part of the on-going efforts on Apache Beam. This specific issue is tracked as [BEAM-2801](https://issues.apache.org/jira/browse/BEAM-2801). – Ben Chambers Oct 17 '17 at 18:44