I read both from the documentation and from this answer that it is possible to determine the table destination dynamically. I used exactly the similar approach as below:
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
However, I get the following compile error:
The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})
Any help would be appreciated.
Edit for clarification on how I use windowing in my case:
PCollection<Foo> validFoos = ...;
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
.of(new ConvertToValidTableRowFn()))
.setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();
validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow t = value.getValue();
String fooName = ""; // get name from table
TableDestination td = new TableDestination(
"my-project:dataset.table$" + fooName, "");
return td;
}
}));
In this case I got the following error The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>)
.