1

I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load method (File loads). I want to retrieve those records which failed during insertion.

Is it possible to have a retry policy on failed records?

Below is my code:

public static void insertToBigQueryDataLake(
        final PCollectionTuple dataStoresCollectionTuple,
        final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
        final Long loadJobTriggerFrequency,
        final Integer loadJobNumShard) {


    WriteResult writeResult = dataStoresCollectionTuple
            .get(dataLakeValidTag)
            .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
            .apply(
                    WRITING_EVENTS_NAME,
                    BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
                            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                            .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
                            .withNumFileShards(loadJobNumShard)
                            .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
                            .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));

    writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
        @ProcessElement
        public void processElement(final ProcessContext processContext) throws IOException {
            System.out.println("Table Row : " + processContext.element().toPrettyString());
        }
    }));

}
Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62

1 Answers1

0

Using the getFailedInsertsWithErr() method we can push the failed inserts to another table for performing Root cause analysis(RCA), check here for more details.

Example:
// write failed rows with their error to error table                
writeResult
        .getFailedInsertsWithErr()
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
        .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                .to(errTableSpec)
                .withJsonSchema(errSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
SANN3
  • 9,459
  • 6
  • 61
  • 97
  • Hi bro, We are able to get the insertions error and records, but only after 1000 retries which is taking more than 7 hours to get the error. Is there any way to set the retry policy in this. I'm doing file load (batch load). – Ashutosh Dixit Jul 21 '21 at 05:18