1

I want to pass a sideInput in PCollection Partition and On basis of that, i need to Divide my PCollection is their anyway....

PCollectionList<TableRow> part = merged.apply(Partition.of(Pcollection Count Function Called, new PartitionFn<TableRow>(){

                                        @Override
                                        public int partitionFor(TableRow arg0, int arg1) {

                                            return 0;
                                        }

                                    }));

Any Other Way through Which I Can Partition My PCollection

//Without Dynamic destination partitioning BigQuery table

merge.apply("write into target", BigQueryIO.writeTableRows()
                                         .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
                                                @Override
                                                public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                                                       TableRow row = value.getValue();
                                                       TableReference reference = new TableReference();
                                                       reference.setProjectId("XYZ");
                                                       reference.setDatasetId("ABC");
                                                       System.out.println("date of row " + row.get("authorized_transaction_date_yyyymmdd").toString());               
                                                       LOG.info("date of row "+
                                                       row.get("authorized_transaction_date_yyyymmdd").toString());
                                                       String str = row.get("authorized_transaction_date_yyyymmdd").toString();
                                                       str = str.substring(0, str.length() - 2) + "01";
                                                       System.out.println("str value " + str);
                                                       LOG.info("str value " + str);
                                                       reference.setTableId("TargetTable$" + str);
                                                       return new TableDestination(reference, null);
                                                }
                                         }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
                                                @Override
                                                public TableRow apply(TableRow input) {
                                                       LOG.info("format function:"+input.toString());

                                                       return input;
                                                }
                                         })

                                         .withSchema(schema1).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                                         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Now I have to use Dynamic Destination Any Solution.Insted Of this and have to Do Partition.

BackBenChers
  • 304
  • 2
  • 15

1 Answers1

1

Based on seeing TableRow in your code, I suspect that you want to write a PCollection to BigQuery, sending different elements to different BigQuery tables. BigQueryIO.write() already provides a method to do that, using BigQueryIO.write().to(DynamicDestinations). See Writing different values to different BigQuery tables in Apache Beam.

jkff
  • 17,623
  • 5
  • 53
  • 85
  • Thanks, @jkff But in My Case I am applying partitioned functionality in table because I have to Divide my PCollection on basis of Distinct Dates which I have kept in form of View and then I have to pass that Different PCollection in DynamicDestination. Because that ListView i can Access in Dynamic Destination So I can Partition BigQuery table on Basis Of that Dates. Example-: reference.setTableId("TargetTable$" + str); return new TableDestination(reference, null); – BackBenChers Nov 15 '17 at 08:38
  • The Method in Dynamic Destination is: TableDestination Where I need to pass that View Date in Reference and Set it in return new TableDestination(reference, null); but For that I Need to Know that my PCollection is partitioned on basis Of that Dates only that's Why I cant use sideOutputs For Dividing My PCollection Also because our Division Count is Also Dynamic. Using For Loop I Will Extract 1 by 1 All Pcollection and Different Dates From DistictDatesView and Set in table – BackBenChers Nov 15 '17 at 08:41
  • BigQueryIO.write().to(DynamicDestinations) can be used for writing partitioned tables too (those with "$"), which is I think what you want. You shouldn't need neither side inputs, nor side outputs, nor the Partition transform to accomplish that. A single application of write().to(DynamicDestinations) will do everything. Sorry, I'm having a hard time understanding the rest of your comments. – jkff Nov 15 '17 at 09:13
  • Sorry @Jkff For That The Scenario in My Case is that BigQuery Provide us to do partition on Basis Of Date But I Have to Partition the BigQuery Table on basis Of Month For that I Have Done WithOut using Dynamic Destination The Code Some Part is Available at-: https://docs.google.com/document/d/1wsUan8oWP7Fw2Hcx42f2vyXW11aWbKX5ZwV5tCkzUCQ/edit?usp=sharing – BackBenChers Nov 15 '17 at 09:28
  • I have Update The Question and Added The Code part Please See It and Scenario is Such that I have to Partition a BigQuery Table On Basis Of Month For That In Above Code I have Replaced Date Days with 01. If all Days are Common in partitioning Data Then Partitioning Happens in Form Of Month. – BackBenChers Nov 15 '17 at 09:34
  • In My Case The Destination table Is Dynamic and Partitioning Dates Of Data is Also on Basis Of Months That Why I made a PCollection> With all Distinct Dates and I need To Partition my Original Data on Basis of That Dates i will Dump My PCollections in Different table/ Partitioned table – BackBenChers Nov 15 '17 at 09:36