3

I am trying to upsert records to iceberg using Spark merge into feature, I am using spark 3.3.0 with iceberg 0.14.0.

Merge Into -

 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]

As mentioned we can use merge into functionality to upsert records to iceberg table however when i am trying to use multiple WHEN MATCHED and WHEN NOT MATCHED condition i get error Caused by: org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceIcebergData RelationV2 And If only use one condition it works.

My final query which is writing data iceberg is -

MERGE INTO default.segment_table old USING (SELECT * FROM segment_final) new ON old.uuid = new.uuid AND old.repeated_at IS NULL WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *

However when i am trying single merge condition seperatly like WHEN MATCHED THEN UPDATE SET * or WHEN NOT MATCHED THEN INSERT * its working fine.

I have checked open issues did not find any thing or in the documentation about this.

And I am unable to figure out what I am doing wrong here or iceberg has some issue with multiple conditions.

==========================Edited Part==================================

Adding more details related to problem -

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.0\
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=$PWD/warehouse

## creating Iceberg table
spark.sql("CREATE TABLE IF NOT EXISTS local.merge_into_issue.sample_data " +
    "(uuid string not null, user_id string, user_status string) using iceberg")


## adding records
spark.sql("INSERT INTO local.merge_into_issue.sample_data " +
    "VALUES ('1', '1', 'active'),('2', '1', 'inactive'), ('3', '2', 'active')")

## Generating new data
spark.sql("select '4' as uuid, 2 as user_id, 'supended' as user_status union select '5' as uuid, '3' as user_id, 'added' as user_status").createOrReplaceTempView("new_data")

val query =
"""
|MERGE INTO local.merge_into_issue.sample_data old
|USING (SELECT * FROM new_data) new
|ON old.user_id = new.user_id
|WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
|""".stripMargin

spark.sql(query)

So, Exact problem here is not null column in sample_data table.

This has been fixed and probably will be released next version.

Affected version - Spark 3.2 and 3.3

RakeshV
  • 444
  • 3
  • 11

0 Answers0