I want to pass a sideInput in PCollection Partition and On basis of that, i need to Divide my PCollection is their anyway....
PCollectionList<TableRow> part = merged.apply(Partition.of(Pcollection Count Function Called, new PartitionFn<TableRow>(){
@Override
public int partitionFor(TableRow arg0, int arg1) {
return 0;
}
}));
Any Other Way through Which I Can Partition My PCollection
//Without Dynamic destination partitioning BigQuery table
merge.apply("write into target", BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow row = value.getValue();
TableReference reference = new TableReference();
reference.setProjectId("XYZ");
reference.setDatasetId("ABC");
System.out.println("date of row " + row.get("authorized_transaction_date_yyyymmdd").toString());
LOG.info("date of row "+
row.get("authorized_transaction_date_yyyymmdd").toString());
String str = row.get("authorized_transaction_date_yyyymmdd").toString();
str = str.substring(0, str.length() - 2) + "01";
System.out.println("str value " + str);
LOG.info("str value " + str);
reference.setTableId("TargetTable$" + str);
return new TableDestination(reference, null);
}
}).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
@Override
public TableRow apply(TableRow input) {
LOG.info("format function:"+input.toString());
return input;
}
})
.withSchema(schema1).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Now I have to use Dynamic Destination Any Solution.Insted Of this and have to Do Partition.
> With all Distinct Dates and I need To Partition my Original Data on Basis of That Dates i will Dump My PCollections in Different table/ Partitioned table
– BackBenChers Nov 15 '17 at 09:36