The Problem
We have a Delta Lake setup on top of ADLS Gen2 with the following tables:
bronze.DeviceData
: partitioned by arrival date (Partition_Date
)silver.DeviceData
: partitioned by event date and hour (Partition_Date
andPartition_Hour
)
We ingest large amounts of data (>600M records per day) from an event hub into bronze.DeviceData
(append-only). We then process the new files in a streaming fashion and upsert them into silver.DeviceData
with the delta MERGE command (see below).
The data arriving in the bronze table can contain data from any partition in silver (e.g. a device may send historic data that it cached locally). However, >90% of the data arriving at any day is from partitions Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
. Therefore, to upsert the data, we have the following two spark jobs:
- "Fast": processes the data from the three date partitions above. The latency is important here, so we prioritize this data
- "Slow": processes the rest (anything but these three date partitions). The latency doesn't matter so much, but it should be within a "reasonable" amount of time (not more than a week I'd say)
Now we come to the problem: although the amount of data is magnitudes less in the "slow" job, it runs for days just to process a single day of slow bronze data, with a big cluster. The reason is simple: it has to read and update many silver partitions (> 1000 date partitions at times), and since the updates are small but the date partitions can be gigabytes, these merge commands are inefficient.
Furthermore, as time goes on, this slow job will become slower and slower, since the silver partitions it touches will grow.
Questions
- Is our partitioning scheme and the fast/slow Spark job setup generally a good way to approach this problem?
- What could be done to improve this setup? We would like to reduce the costs and the latency of the slow job and find a way so that it grows with the amount of data arriving at any day in bronze rather than with the size of the silver table
Additional Infos
- we need the MERGE command, as certain upstream services can re-process historic data, which should then update the silver table as well
- the schema of the silver table:
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
- our MERGE command:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute