2

I am new to this Databricks Autoloader, we have a requirement where we need to process the data from AWS s3 to delta table via Databricks autoloader. I was testing this autoloader so I came across duplicate issue that is if i upload a file with name say emp_09282021.csv having same data as emp_09272021.csv then it is not detecting any duplicate it is simply inserting them so if I had 5 rows in emp_09272021.csv file now it will become 10 rows as I upload emp_09282021.csv file.

below is the code that i tried:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start("s3://some-s3-path/spark_stream_processing/target/")

any guidance please to handle this?

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
MykG
  • 109
  • 1
  • 11

1 Answers1

3

It's not the task of the autoloader to detect duplicates, it provides you the possibility to ingest data, but you need to handle duplicates yourself. There are several approaches to that:

  • Use built-in dropDuplicates function. It's recommended to use it with watermarking to avoid creating a huge state, but you need to have some column that will be used as event time, and it should be part of dropDuplicate list (see docs for more details):
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("col1", "eventTime")
  • Use Delta's merge capability - you just need to insert data that isn't in the Delta table, but you need to use foreachBatch for that. Something like this (please note that table should already exist, or you need to add a handling of non-existent table):
from delta.tables import *

def drop_duplicates(df, epoch):
  table = DeltaTable.forPath(spark, 
      "s3://some-s3-path/spark_stream_processing/target/")
   dname = "destination"
   uname = "updates"
   dup_columns = ["col1", "col2"]
   merge_condition = " AND ".join([f"{dname}.{col} = {uname}.{col}"
      for col in dup_columns])
   table.alias(dname).merge(df.alias(uname), merge_condition)\
     .whenNotMatchedInsertAll().execute()

# ....
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.foreachBatch(drop_duplicates)\
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start()

In this code you need to change the dup_columns variable to specify columns that are used to detect duplicates.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Hey Alex! it looks promising.. thank you! let me test it out. actually we have to bring the data from oracle to delta tables, i did not find any direct streaming option, so we are using AWS DMS CDC to bring data from oracle to S3 and then via autoloader as delta format. can you suggest any other option? like if we can avoid this tricky route.. – MykG Sep 28 '21 at 11:04
  • it's quite standard setup for change data capture (CDC). I'm not familiar with DMS yet, but I believe that it will send you an additional information on what kind of data changes happened - insert/update/delete. In this case you need to use merge to handle this. But maybe this is a topic for a separate question. – Alex Ott Sep 28 '21 at 12:36
  • 1
    yes Alex! I found something, this would help me. https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html – MykG Sep 28 '21 at 13:32