When the ETL job is run it execute properly but as the table is not having Timestamp it duplicate the data when the same ETL job is run.How to perform staging and solve this problem using Upsert or if any other you are welcome to answer.How do I get rid of this problem the solution I find is either include timestamp in it or doing staging or is there any other way?
Asked
Active
Viewed 1,381 times
2 Answers
0
U can use overwrite
while writing data to s3. It will replace original data

Sandeep Fatangare
- 2,054
- 9
- 14
-
You may also use `job bookmark` feature of Glue job to avoid rerunning glue job on already processed data. – Sandeep Fatangare Jan 31 '19 at 17:50
0
To prevent duplicates on s3 you need to load data from destination and filter out existing records before saving:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
However, this method doesn't overwrite updated records.
Another option is to save updated records too with some updated_at
field which can be used by downstream consumers to get the latest values.
You can also consider dumping dataset into a separate folder each time you run your job (ie. every day you have a full dump of data in data/dataset_date=<year-month-day>
)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))

Yuriy Bondaruk
- 4,512
- 2
- 33
- 49