I'm new using hudi and I have a problem. I'm working with an EMR in AWS with pyspark, Kafka and what I want to do is to read a topic from the Kafka cluster with pyspark streaming and then move it to S3 in hudi format. To be honest I've tried a lot since a few weeks ago and I don't know if it is not possible. Can someone tell help me, please? The code i'm working with is:
#Reading
df_T = spark.readStream \
.format("kafka") \
.options(**options_read) \
.option("subscribe", topic) \
.load()
....
hudi_options = {
'hoodie.table.name': MyTable,
'hoodie.datasource.write.table.name': MyTable,
'hoodie.datasource.write.recordkey.field': MyKeyInTable,
'hoodie.datasource.write.partitionpath.field': MyPartitionKey,
'hoodie.datasource.write.hive_style_partitioning': "true",
'hoodie.datasource.write.row.writer.enable': "false",
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.precombine.field': MyTimeStamp,
'hoodie.insert.shuffle.parallelism': 1,
'hoodie.consistency.check.enabled': "true",
'hoodie.cleaner.policy': "KEEP_LATEST_COMMITS",
'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
'hoodie.compact.inline': "false",
'hoodie.datasource.hive_sync.table': MyTable,
'hoodie.datasource.hive_sync.partition_fields': MyPartitionKey,
'hoodie.datasource.hive_sync.database' : Mydatabase,
'hoodie.datasource.hive_sync.auto_create_database': "true",
'hoodie.datasource.write.keygenerator.class': "org.apache.hudi.keygen.ComplexKeyGenerator",
'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.skip_ro_suffix': 'true'
}
....
ds = df_T \
.writeStream \
.outputMode('append') \
.format("org.apache.hudi") \
.options(**hudi_options)\
.option('checkpointLocation', MyCheckpointLocation) \
.start(MyPathLocation) \
.awaitTermination(300)
....
This code in the EMR says that works fine, but when i'm going to look for the hudi files it does not create any. I know that the kafka configuration works, because when in the output mode I set 'console' it works fine, can someone help me?