2

I am pushing some initial bulk data into a hudi table, and then every day, I write incremental data into it. But if back data arrives, then the latest precombined field which is already in the table is ignored and the arriving precombined field(which is older) over writes it.

I write a data frame containing the following data with the following configs:

+---+-----+-------------+
| id|  req|dms_timestamp|
+---+-----+-------------+
|  1|  one|   2022-12-17|
|  2|  two|   2022-12-17|
|  3|three|   2022-12-17|
+---+-----+-------------+

      "className"-> "org.apache.hudi",
      "hoodie.datasource.write.precombine.field"-> "dms_timestamp",
      "hoodie.datasource.write.recordkey.field"-> "id",
      "hoodie.table.name"-> "hudi_test",
      "hoodie.consistency.check.enabled"-> "false",
      "hoodie.datasource.write.reconcile.schema"-> "true",
      "path"-> basePath,
      "hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
      "hoodie.datasource.write.partitionpath.field"-> "",
      "hoodie.datasource.write.hive_style_partitioning"-> "true",

      "hoodie.upsert.shuffle.parallelism"-> "1",
      "hoodie.datasource.write.operation"-> "upsert",
      "hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
      "hoodie.cleaner.commits.retained"-> "5",
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |req  |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214130513893  |20221214130513893_0_0|id:3              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|3  |three|2022-12-17   |
|20221214130513893  |20221214130513893_0_1|id:1              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|1  |one  |2022-12-17   |
|20221214130513893  |20221214130513893_0_2|id:2              |                      |005674e6-a581-419a-b8c7-b2282986bc52-0_0-36-34_20221214130513893.parquet|2  |two  |2022-12-17   |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+

Then In the next run I upsert the following data:

+---+----+-------------+
| id| req|dms_timestamp|
+---+----+-------------+
|  1|null|   2019-01-01|
+---+----+-------------+

      "hoodie.table.name"-> "hudi_test",
      "hoodie.datasource.write.recordkey.field" -> "id",
      "hoodie.datasource.write.precombine.field" -> "dms_timestamp",

      // get_common_config
      "className"-> "org.apache.hudi",
      "hoodie.datasource.hive_sync.use_jdbc"-> "false",
      "hoodie.consistency.check.enabled"-> "false",
      "hoodie.datasource.write.reconcile.schema"-> "true",
      "path"-> basePath,

      // get_partitionDataConfig -- no partitionfield
      "hoodie.datasource.write.keygenerator.class"-> "org.apache.hudi.keygen.ComplexKeyGenerator",
      "hoodie.datasource.write.partitionpath.field"-> "",
      "hoodie.datasource.write.hive_style_partitioning"-> "true",

      // get_incrementalWriteConfig
      "hoodie.upsert.shuffle.parallelism"-> "1",
      "hoodie.datasource.write.operation"-> "upsert",
      "hoodie.cleaner.policy"-> "KEEP_LATEST_COMMITS",
      "hoodie.cleaner.commits.retained"-> "5",

and getting this table:

+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |req  |dms_timestamp|
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+
|20221214131440563  |20221214131440563_0_0|id:3              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|3  |three|2022-12-17   |
|20221214131555500  |20221214131555500_0_1|id:1              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|1  |null |2019-01-01   |
|20221214131440563  |20221214131440563_0_2|id:2              |                      |37dee403-6077-4a01-bf28-7afd65ef390a-0_0-18-21_20221214131555500.parquet|2  |two  |2022-12-17   |
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---+-----+-------------+

This should not happen as this is back-date data arriving late in the stream. How to handle this?

awadhesh14
  • 89
  • 7

1 Answers1

1

By default, Hudi uses org.apache.hudi.common.model.OverwriteWithLatestAvroPayload as payload class, with this class, Hudi uses the precombine field just to deduplicate the incoming data (precombine step), then it overwrites the existing record with the new one without comparing the precombine field values.

If you want to always keep the last updated record, you need to add this configuration:

"hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.DefaultHoodieRecordPayload"
Hussein Awala
  • 4,285
  • 2
  • 9
  • 23