As far as I can tell from the documentation, I can not accomplish a specific migration from Delta to Delta Live that I would love to do... but I want to see if I might be missing a solution.
Currently, i have a number of aggregate batch Delta tables that upsert new records on a daily basis, keeping only a very basic set of information.
key, first_seen, last_seen
with normal Delta upserts, I do this by grabbing the new data, creating a data frame with the last seen information and conditionally updating the last_seen
value or inserting all. so it is like
existing.alias(‘existing’).merge(
summary.alias('updates'), "existing.key = updates.key")\
.whenMatchedUpdate(condition="updates.last_seen > existing.last_seen",
set = { "last_seen": "updates.last_seen"})\
.whenNotMatchedInsertAll()\
.execute()
I really would like to bring this into Delta Live pipelines and change it to an incremental update. Life would be so great. I only want to keep current information so this should match SCD Type 1.
From the Databricks documentation on CDC, I can not see how to do an upsert on a subset of columns which also performs an whenNotMatchedInsertAll()
.
Any thoughts?