I am specifically looking to optimize performance by updating and inserting data to a DeltaLake base table, with about 4 trillion records.
Environment:
- Spark 3.0.0
- DeltaLake 0.7.0
In context this is about making an incremental table via DeltaLake, I'll summarize this in steps to be more detailed:
- Creation of the base table (delta)
- Obtaining periodic data
- Add the data to the base table
Steps 1 and 2 have already been done, but when adding the data the performance is notoriously slow, for example adding a 9GB CSV takes about 6 hours, this mainly because delta needs to rewrite the data for each update, it also needs "read" all data from the database.
This table is also partitioned (PARTITIONED BY
) and stored in the cluster's GDFS (HDFS) to ensure that the spark nodes can perform the operations.
The fields of the base table:
Name | Type | Cardinality | Comment |
---|---|---|---|
ID | int |
10000 | Identifier |
TYPE | string |
30 | |
LOCAL_DATE | date |
Local date of the record | |
DATE_UTC | date |
UTC date of registration | |
VALUE | int |
Registry value | |
YEAR | int |
4 | Calculated column |
MONTH | int |
12 | Calculated column |
DAY | int |
31 | Calculated column |
As the general search is by time, it was decided to partition by the LOCAL_DATE
column in YEAR
, MONTH
, DAY
, partitioning by the ID and LOCAL_DATE
columns was ruled out due to its high level of cardinality, (which for performance purposes is worse), it was added finally TYPE
, being as follows:
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
ID INT,
FECHA_LOCAL TIMESTAMP,
FECHA_UTC TIMESTAMP,
TIPO STRING,
VALUE DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
LOCATION '{location}'
""")
From now on, the incrementality is given by periodically adding these csv files of approximately 9GB every 5 days. Currently the MERGE
operation is as follows:
spark.sql(f"""
MERGE INTO {BASE_TABLE_NAME}
USING {INCREMENTAL_TABLE_NAME} ON
--partitioned cols
{BASE_TABLE_NAME}.YEAR = {INCREMENTAL_TABLE_NAME}.YEAR AND
{BASE_TABLE_NAME}.MONTH = {INCREMENTAL_TABLE_NAME}.MONTH AND
{BASE_TABLE_NAME}.DAY = {INCREMENTAL_TABLE_NAME}.DAY AND
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO AND
{BASE_TABLE_NAME}.FECHA_LOCAL = {INCREMENTAL_TABLE_NAME}.FECHA_LOCAL AND
{BASE_TABLE_NAME}.ID = {INCREMENTAL_TABLE_NAME}.ID
WHEN MATCHED THEN UPDATE
SET {BASE_TABLE_NAME}.VALUE = {INCREMENTAL_TABLE_NAME}.VALUE,
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO
WHEN NOT MATCHED THEN INSERT *
""")
Some facts to consider:
- The time of this MERGE operation is 6 hours
- The base table was created from 230GB CSV data (55GB now in delta!)
- The spark application configuration is in cluster mode with the following parameters
- The infra consists of 3 nodes, 32 cores and 250GB RAM each, although it takes up less for security than the other existing applications approximately -50% of resources.
Spark app:
mode = 'spark://spark-master:7077'
# mode = 'local [*]'
spark = (
SparkSession.builder
.master(mode)
.appName("SparkApp")
.config('spark.cores.max', '45')
.config('spark.executor.cores', '5')
.config('spark.executor.memory', '11g')
.config('spark.driver.memory', '120g')
.config("spark.sql.shuffle.partitions", "200") # 200 only for 200GB delta table reads
.config("spark.storage.memoryFraction", "0.8")
# DeltaLake configs
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Delta optimization
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.getOrCreate()
)