2

I'm trying out Hudi, Delta Lake, and Iceberg in AWS Glue v3 engine (Spark 3.1) and have both Delta Lake and Iceberg running just fine end to end using a test pipeline I built with test data. Note I am not using any of the Glue Custom Connectors. I'm using pyspark and standard Spark code (not the Glue classes that wrap the standard Spark classes)

For Hudi, the install of the Hudi jar is working fine as I'm able to write the table in the Hudi format and can create the table DDL in the Glue Catalog just fine and read it via Athena. However, when I try to run a crud statement on the newly created table, I get errors. For example, trying to run a simple DELETE SparkSQL statement, I get the error: 'DELETE is only supported with v2 tables.'

I've added the following jars when building the SparkSession:

  • org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0
  • com.amazonaws:aws-java-sdk:1.10.34
  • org.apache.hadoop:hadoop-aws:2.7.3

And I set the following config for the SparkSession:

  • self.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')

I've tried many different versions of writing the data/creating the table including:

hudi_options = {
        'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
        'hoodie.table.version': 2,
        'hoodie.table.name': 'db.table_name',
        'hoodie.datasource.write.recordkey.field': 'id', key is required in table.
        'hoodie.datasource.write.partitionpath.field': '',
        'hoodie.datasource.write.table.name': 'db.table_name',
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.datasource.write.precombine.field': 'date_modified',
        'hoodie.upsert.shuffle.parallelism': 2,
        'hoodie.insert.shuffle.parallelism': 2
    }

df.write \
    .format('hudi') \
    .options(**hudi_options) \
    .mode('overwrite') \
    .save('s3://...')
sql = f"""CREATE TABLE {FULL_TABLE_NAME}
                USING {DATA_FORMAT}
                options (
                    type = 'cow',
                    primaryKey = 'id',
                    preCombineField = 'date_modified',
                    partitionPathField = '',
                    hoodie.table.name = 'db.table_name',
                    hoodie.datasource.write.recordkey.field = 'id',
                    hoodie.datasource.write.precombine.field = 'date_modified',
                    hoodie.datasource.write.partitionpath.field = '',
                    hoodie.table.version = 2
                )
                LOCATION '{WRITE_LOC}'
                AS SELECT * FROM {SOURCE_VIEW};"""
spark.sql(sql)

The above works fine. It's when I try to run a CRUD operation on the table created above that I get errors. For instance, I try deleting records via the SparkSQL DELETE statement and get the error 'DELETE is only supported with v2 tables.'. I can't figure out why it's complaining about not being a v2 table. Any clues would be hugely appreciated.

0 Answers0