0

I read both from the documentation and from this answer that it is possible to determine the table destination dynamically. I used exactly the similar approach as below:

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

However, I get the following compile error:

The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})

Any help would be appreciated.

Edit for clarification on how I use windowing in my case:

PCollection<Foo> validFoos = ...;           
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
        .of(new ConvertToValidTableRowFn()))
        .setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();    

validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            @Override
            public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                TableRow t = value.getValue();
                String fooName = ""; // get name from table
                TableDestination td = new TableDestination(
                        "my-project:dataset.table$" + fooName, "");
                return td;
            }
        }));

In this case I got the following error The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>).

Ali
  • 97
  • 6
  • What version of the SDK are you using? From other post - _"This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github."_ – Graham Polley Jun 01 '17 at 00:09
  • I use Apache Beam 2.0.0 stable version released on May. In its documentation it is said that this feature is included. See sharding part in [this documentation](https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). – Ali Jun 01 '17 at 07:25
  • I just came across [this fresh blogpost](https://medium.com/google-cloud/bigquery-partitioning-with-beam-streams-97ec232a1fcc) dealing with this. It has some difference in syntax (returning `TableReference`s instead of `TableDestination`s) and separates the code into a class (makes it a bit cleaner). I haven't tested it myself (and I used similar code to yours in the past), but I hope this helps. – Matthias Baetens Jun 01 '17 at 14:54
  • I tried it really quickly myself with your code, and it doesn't seem to give any errors for me. Can you check your POM and make sure your BEAM version is 2.1.0-SNAPSHOT? – Matthias Baetens Jun 01 '17 at 15:25

1 Answers1

0

I believe the compilation error comes from the fact that you preform this operation on PCollection<Foo>, while actually it expects windowed values. So you should first use .apply(Window.<Foo>into(...)), and then determine the table destination based on your window.

You can see examples in this answer, or this answer, as well as in the documentation you mentioned.

Alexey
  • 318
  • 1
  • 2
  • 10
  • Thanks for reply, but when I apply that windowing exactly described in [this answer](https://stackoverflow.com/questions/43505534/writing-different-values-to-different-bigquery-tables-in-apache-beam/43655461#43655461), this time I get `The method apply(PTransform super PCollection,OutputT>) in the type PCollection is not applicable for the arguments (Window)` – Ali Jun 01 '17 at 09:27
  • You should change the code to be `(Window.into(..))`, because in your case you have a `PCollection` and you use the more general `write()` method, while the answer code uses `PCollection` and `writeTableRows()` method. – Alexey Jun 01 '17 at 09:31
  • Yeah, but in this case I converted my `PCollection foos` into `PCollection fooRows` before I used this function. – Ali Jun 01 '17 at 09:49
  • Could try the snippet from [this doc](https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html), i.e.: `PCollection quotes = ... quotes.apply(Window.into(CalendarWindows.days(1))) .apply(BigQueryIO.writeTableRows() .withSchema(schema) .to(new SerializableFunction() { public String apply(ValueInSingleWindow value) { ... } }));` Which part the code doesn't compile for you exactly? – Alexey Jun 01 '17 at 10:51
  • I also did that, but I get the same error over the first `apply`. It complains as `The method apply(PTransform super PCollection,OutputT>) in the type PCollection is not applicable for the arguments (Window)` – Ali Jun 01 '17 at 10:55