5

BigQuery supports de-duplication for streaming insert. How can I use this feature using Apache Beam?

https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency

To help ensure data consistency, you can supply insertId for each inserted row. BigQuery remembers this ID for at least one minute. If you try to stream the same set of rows within that time period and the insertId property is set, BigQuery uses the insertId property to de-duplicate your data on a best effort basis. You might have to retry an insert because there's no way to determine the state of a streaming insert under certain error conditions, such as network errors between your system and BigQuery or internal errors within BigQuery. If you retry an insert, use the same insertId for the same set of rows so that BigQuery can attempt to de-duplicate your data. For more information, see troubleshooting streaming inserts.

I can not find such feature in Java doc. https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html

In this question, he suggest to set insertId in TableRow. Is this correct?

https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableRow.html?is-external=true

BigQuery client library has this feature.

https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/index.html?com/google/cloud/bigquery/package-summary.html https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/InsertAllRequest.java#L134

Design.Garden
  • 3,607
  • 25
  • 21
Yohei Onishi
  • 1,362
  • 1
  • 19
  • 42
  • Can you specify more about your use case? Dataflow/Beam should perform only-once when coupled with BigQuery, without you needing to specify manually an insertId. – Felipe Hoffa Jan 10 '19 at 04:56
  • my use case is mentioned above. want to de-duplicate when inserting to BigQuery. so just specify insertId as column in new row? – Yohei Onishi Jan 10 '19 at 05:08
  • I understand you want to de-duplicate. But depending on the source of duplication, this might be already a solved problem. – Felipe Hoffa Jan 10 '19 at 05:53
  • no duplication on data source side. since Kafka support at least once delivery as default so I think there is possibility of duplication between Kafka producer and consumer. and also I guess dataflow might insert same row more than once when retry on some errors (e.g. temporal network issue). so I just want to know how I can avoid duplication on both. this question is about stream insert from dataflow to bigquery. – Yohei Onishi Jan 10 '19 at 06:00
  • In my actual use case, requirement for de-duplication is not so strong. So I think the easiest way is just to insert to Big Query then de-duplication on query. but I just want to know BigQueryIO (Apache Beam) support deduplication feature. – Yohei Onishi Jan 10 '19 at 06:25

2 Answers2

3
  • Pub/Sub + Beam/Dataflow + BigQuery: "Exactly once" should be guaranteed, and you don't need to worry much about this. That guarantee is stronger when you ask Dataflow to insert to BigQuery using FILE_LOADS instead of STREAMING_INSERTS, for now.

  • Kafka + Beam/Dataflow + BigQuery: If a message can be emitted more than once from Kafka (e.g. if the producer retried the insertion), then you need to take care of de-duplication. Either in BigQuery (as currently implemented, according to your comment), or in Dataflow with a .apply(Distinct.create()) transform.

Pablo
  • 10,425
  • 1
  • 44
  • 67
Felipe Hoffa
  • 54,922
  • 16
  • 151
  • 325
  • Thanks! but my original question is how to use BigQuery deduplication feature from Apache Beam. – Yohei Onishi Jan 11 '19 at 02:58
  • 1
    You can't manually, because Dataflow is already using insertId for itself to implement "exactly once" as described. – Felipe Hoffa Jan 11 '19 at 03:42
  • OK I see. Thank you for clarification. – Yohei Onishi Jan 11 '19 at 07:07
  • Thanks for asking! I had to ask some experts to get to this answer :). Including Pablo, who improved my answer above – Felipe Hoffa Jan 11 '19 at 08:38
  • And I can not find about `.apply(Distinct.create())` transform in Apache Beam document. So it would be helpful if you could mention about it in the document. – Yohei Onishi Jan 11 '19 at 09:06
  • https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Distinct.html – Felipe Hoffa Jan 11 '19 at 10:21
  • I mean it is not easy to find it in japadoc without any explanation in Apache beam website – Yohei Onishi Jan 11 '19 at 14:31
  • I'm confused now – Felipe Hoffa Jan 11 '19 at 18:03
  • "Dataflow is already using insertId for itself to implement "exactly once" as described" Can I see how this is implemented? Would you provide the link to this implementation? Thanks. – Yohei Onishi Jan 15 '19 at 06:50
  • Re: "That guarantee is stronger when you ask Dataflow to insert to BigQuery using FILE_LOADS instead of STREAMING_INSERTS, for now". Is it really a "guarantee" that duplicate can't possibly happen? if so, why one is stronger than the other? – dliu Oct 16 '21 at 00:09
1

As Felipe mentioned in the comment, it seems that Dataflow is already using insertId for itself to implement "exactly once". so we can not manually specify insertId.

Yohei Onishi
  • 1,362
  • 1
  • 19
  • 42