2

I know that we have to create TupleTags before we create side outputs. But what if I don't know beforehand how many side outputs must be created.

I need to create side outputs depending on one of the columns of data within Apache Beam. Is there a way to achieve this?

Dynamic Destinations code:

readTableRow.apply("Write output to BigQuery",BigQueryIO.writeTableRows()
          .to(new DynamicDestinations<TableRow, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public List<PCollectionView<?>> getSideInputs() {
                    return ImmutableList.of(distinctDatesView);
                }

             @Override
              public String getDestination(ValueInSingleWindow<TableRow> element) {
                List<String> list = sideInput(distinctDatesView);
                LOG.info("Element: "+element);
                String tableSpec = "project-id:dataset-id.table-id";

                for(String str:list)
                {
                 if(element.getValue().get("date").equals(str))
                 {
                     tableSpec = "project-id:dataset-id.table-id$"+str;
                    return tableSpec;
                 }
                }
                return tableSpec;
             }

            @Override
            public TableSchema getSchema(String arg0) {
                List<TableFieldSchema> fields = new ArrayList<>();
                fields.add(new TableFieldSchema().setName("letter").setType("STRING"));
                fields.add(new TableFieldSchema().setName("date").setType("STRING"));
                TableSchema schema = new TableSchema().setFields(fields);
                return schema;
            }

            @Override
            public TableDestination getTable(String arg1) {
                LOG.info("destination: "+arg1); //arg1 = PROJECT:DATASET.MYTABLE$20171102
                String tableSpec = arg1;
                TableReference reference = new TableReference();

                return new TableDestination(tableSpec, "");
            }                   
          })
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE).withoutValidation());

Error that I'm getting:

    (5bd903d2a90b859e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to patch table description: {datasetId=myDATASET, projectId=myPROJECT, tableId=myTABLE$20171105}, aborting after 9 retries.
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
    at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54)
    at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:330)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:302)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:251)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Unable to patch table description: {datasetId=myDATASET, projectId=myPROJECT, tableId=myTABLE$20171105}, aborting after 9 retries.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
    at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
    at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
    at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84)
    at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180)
    ... 21 more
Caused by: java.io.IOException: Unable to patch table description: {datasetId=myDATASET, projectId=myPROJECT, tableId=myTABLE$20171105}, aborting after 9 retries.
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:879)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.patchTableDescription(BigQueryServicesImpl.java:826)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:250)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:77)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:141)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Invalid table ID \"myTABLE$20171105\". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used.",
    "reason" : "invalid"
  } ],
  "message" : "Invalid table ID \"myTABLE$20171105\". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used."
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:870)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.patchTableDescription(BigQueryServicesImpl.java:826)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:250)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:77)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:141)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
    at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
    at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
    at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84)
    at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
    at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54)
    at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74)
    at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:330)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:302)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:251)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Thanks.

rish0097
  • 1,024
  • 2
  • 18
  • 39
  • Please explain more of your use case. As stated, it is impossible, because you'd need to also *do something* with the PCollections you're dynamically creating, and that would require adding new steps to the pipeline dynamically. This is not supported. However, if you're doing something like "write different data to different files or BigQuery tables" - that is supported. – jkff Nov 17 '17 at 17:46
  • @jkff I want to create side outputs based on a date column in the data. The resulting PCollections will then have data of the same date. Finally I want to write them in a single partitioned table. – rish0097 Nov 17 '17 at 19:37
  • If you're writing to different partitions of a BigQuery table based on date, see https://stackoverflow.com/questions/43505534/writing-different-values-to-different-bigquery-tables-in-apache-beam/43505535 – jkff Nov 17 '17 at 19:43
  • Could you use a Map side-input, so that you have a single side-input that is keyed by the date column? – Ben Chambers Nov 17 '17 at 23:51
  • @jkff I'm using Dynamic destinations since I'm getting the destination table and schema at runtime and as per my knowledge, partitioning the data over there based on the link you gave won't be possible. Is there any way to partition the data using Partition transform or Side Outputs or anything else before I write the data to BigQuery? But then again the number of partitions will be dynamic based on the dates in the data. – rish0097 Nov 22 '17 at 10:33
  • @rish0097 Partitioning data dynamically into multiple tables is exactly what DynamicDestinations are intended for; ability to infer schema at runtime and so on are just nice gimmicks on top of that. Can you update your question with your current DynamicDestinations code and describe what trouble you're having with getting it to partition your data? – jkff Nov 22 '17 at 17:01
  • @jkff I've added the Dynamic Destinations code that I'm using. How to choose separate destinations for different rows of data using the above code. – rish0097 Nov 23 '17 at 08:22
  • I see your code uses the same bogus destination for all elements, and ignores the destination in all methods. The point of dynamic destinations is to have 1 destination per table. The destination identifies the table. Eg if you want one table per category of products of type Category, the destination type should be Category. If you want one table per day, it should be probably Instant. Implementations of other methods follow naturally once you've chosen what you'll use as your destination type. – jkff Nov 23 '17 at 14:58
  • For creating a table name according to BigQuery time partitioning, take a look at "partition decorators" – jkff Nov 23 '17 at 14:59
  • @jkff ok but my table is already present within Big Query. I just have to add data within it, in different partitions. If possible can you please provide an example of how to put different rows of data in different tables depending on a certain condition using Dynamic Destinations. That will really be helpful. – rish0097 Nov 23 '17 at 17:40
  • @jkff I've updated the post with my latest code and error that I'm getting. I'm able to set the destinations correctly depending on the row. But somehow it's not accepting $ (table decorator) in TableDestination. But the same thing works well without DynamicDestinations when we use SerializableFunction as suggested in the link shared by you in one of your previous comments. Can you please help with this issue. – rish0097 Nov 24 '17 at 09:39
  • This was fixed in Beam 2.2.0 - I believe the release has just been finalized. https://issues.apache.org/jira/browse/BEAM-2870 Please try updating to 2.2.0. – jkff Nov 25 '17 at 06:42
  • By the way, the getDestination() code looks extremely inefficient: why scan the entire side input for every single element you're writing to BigQuery, only to find a value that equals something that is already available in your element's field? Just use element.getValue().get("date") itself as the destination! Or am I misunderstanding something? – jkff Nov 25 '17 at 06:44
  • @jkff I'm using Beam 2.2 only...do I have to use a backslash or something? – rish0097 Nov 26 '17 at 12:56
  • Can you include the complete exception stacktrace rather than just the innermost error message? Also, do you mean you're using 2.2.0-SNAPSHOT? 2.2.0 was released only yesterday or so. Your SNAPSHOT version might be out of date in your local maven cache. – jkff Nov 26 '17 at 16:04
  • @jkff Oh ok...thank you for that information...I'll create a new 2.2 project and try it out. And yeah I was using a snapshot of 2.2! Will update you soon :) – rish0097 Nov 26 '17 at 16:47
  • @jkff I'm running the code in Beam 2.2 and partitioned data is getting inserted correctly. But for some reason the dataflow job fails everytime at BigQueryIO.write() step giving the error that I have mentioned in the post. – rish0097 Nov 27 '17 at 10:12
  • @jkff I've updated the post with the entire error message. – rish0097 Nov 27 '17 at 10:31
  • Hmm, thanks, this might be a bug. We'll take a look. Apparently https://issues.apache.org/jira/browse/BEAM-2870 fixed only the streaming case, but the error also happens in the batch case. – jkff Nov 27 '17 at 16:39
  • Fix out for review https://github.com/apache/beam/pull/4177 . Unfortunately it won't make it into 2.2, so when it's merged you'll have to use 2.3.0-SNAPSHOT. Meanwhile you can try patching it and building your own version of Beam and running your job with that. – jkff Nov 27 '17 at 18:36

0 Answers0