1

As a followup question to the following question and answer:

https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

I'd like to confirm with google dataflow engineering team (@jkff) if the 3rd option proposed by Eugene is at all possible with google dataflow:

"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"

My understanding is that ParDo/DoFn will process each element, how could we specify a table name (function of the keys passed in from side inputs) when writing out from processElement of a ParDo/DoFn?

Thanks.

Updated with a DoFn, which is not working obviously since c.element().value is not a pcollection.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    
Community
  • 1
  • 1
Alan
  • 13
  • 1
  • 5
  • 1
    This is now available in out-of-the-box latest Beam http://stackoverflow.com/questions/43505534/writing-different-values-to-different-bigquery-tables-in-apache-beam/43505535 – jkff Apr 19 '17 at 20:35

1 Answers1

2

The BigQueryIO.Write transform does not support this. The closest thing you can do is to use per-window tables, and encode whatever information you need to select the table in the window objects by using a custom WindowFn.

If you don't want to do that, you can make BigQuery API calls directly from your DoFn. With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing. To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

You can see how the Dataflow runner does the streaming import here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

danielm
  • 3,000
  • 10
  • 15
  • thanks for the reply. However, I still could not grasp around the syntax for the DoFn to call the BigQueryIO.Write in processElement. I need help on two things: 1. Could you please show me a quick DoFn example for the mentioned usage? 2. Will call BigQueryIO.Write in processElement cause extra performance issue, since this will be invoked in each element processing? Thanks. – Alan Mar 15 '16 at 05:14
  • Also added a DoFn in the OP, please share how to write out values per the keys in the PCollection>. Thanks. – Alan Mar 15 '16 at 05:23
  • Thanks Daniel for updating the answer. Really hope a new feature could be added to Text and BQ IO writer to allow dynamically named files or tables. Not saying it's going to be an easy task but just really useful functions. I've accepted the answer. – Alan Mar 17 '16 at 22:27