I have a spark dataframe in Databricks cluster with 5 million rows. And what I want is to cache this spark dataframe and then apply .count() so for the next operations to run extremely fast. I have done it in the past with 20,000 rows and it works. However, in my trial to do this I came into the following paradox:
Dataframe creation
Step 1: Read 8 millions rows from Azure Data Lake storage account
read_avro_data=spark.read.format("avro").load(list_of_paths) #list_of_paths[0]='abfss://storage_container_name@storage_account_name.dfs.core.windows.net/folder_1/folder_2/0/2020/06/02/00/00/27.avro'
avro_decoded=read_avro_data.withColumn('Body_decoded', sql_function.decode(read_avro_data.Body, charset="UTF-8")).select("Body_decoded")
datalake_spark_dataframe=datalake_spark_dataframe.union(avro_decoded.withColumn("Body_decoded", sql_function.from_json("Body_decoded", schema)).select(*['Body_decoded.{}'.format(x) for x in columns_selected]))
datalake_spark_dataframe.printSchema()
"root
|-- id: string (nullable = true)
|-- BatteryPercentage: float (nullable = true)
|-- SensorConnected: integer (nullable = false)
|-- TemperatureOutside: float (nullable = true)
|-- ReceivedOn: string (nullable = true)"
datalake_spark_dataframe.rdd.getNumPartitions() # 635 partitions
This dataframe has 8 million rows. With 8 million rows my application runs pretty good, but I wanted to stress test my application in a big-data environment. Because 8 million rows is not Big-Data. Thus I replicated my 8 millions rows Spark Dataframe 287 times ~ 2.2 billion rows. To make the replication I did the following:
Step 2: Replicate the 8 million rows dataframe
datalake_spark_dataframe_new=datalake_spark_dataframe
for i in range(287):
print(i)
datalake_spark_dataframe_new=datalake_spark_dataframe_new.union(datalake_spark_dataframe)
print("done on iteration: {0}".format(i))
datalake_spark_dataframe_new.rdd.getNumPartitions() #182880
Having the final 2.2 billion rows dataframe, I made a time-window GroupBy of my data, ending up with some millions of rows. I have written approximately that the grouped dataset has 5 million rows in the top of my question.
Step 3: GroupBy the 2.2 billion rows dataframe by a time window of 6 hours & Apply the .cache() and .count()
%sql set spark.sql.shuffle.partitions=100
import pyspark.sql.functions as sql_function
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, DoubleType, ArrayType
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_new.withColumn(timestamp_column, sql_function.to_timestamp(timestamp_column, "yyyy-MM-dd HH:mm"))
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.groupBy("id", sql_function.window("ReceivedOn","{0} minutes".format(time_interval)))\
.agg(
sql_function.mean("BatteryPercentage").alias("BatteryPercentage"),
sql_function.mean("SensorConnected").alias("OuterSensorConnected"),
sql_function.mean("TemperatureOutside").alias("averageTemperatureOutside"))
columns_to_drop=['window']
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.drop(*columns_to_drop)
# From 2.2 billion rows down to 5 million rows after the GroupBy...
datalake_spark_dataframe_downsampled.repartition(100)
datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count() # job execution takes for ever
datalake_spark_dataframe_downsampled.rdd.getNumPartitions() #100 after re-partition
Spark UI before showing the .count()
Spark UI during the count execution
When I apply the following commands to my spark Dataframe it takes more than 3 hours to complete this task, which in the end fails.
I want to add that before and after the re-partitioning, the job had the same behavior in time execution. So, I did the re-partitioning in case the default values were making the job to run very slow. Thus, I was keep adding partitions in case the job would execute faster.
%sql set spark.sql.shuffle.partitions=1000000
datalake_spark_dataframe_downsampled.repartition(1000000)
datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count()
Below is the output of the spark job:
The error I get:
My cluster resources:
As you can see it's not a matter of RAM or CPU cores, as I have plenty of them. Why the job splits only to 5 stages even after I apply re-partitioning? and How can I split the jobs so the .cache() and .count() commands run faster based on my 48 vCPU cores?
Screenshots provided per job execution Execution on 80 million rows (8m * 10 iterations = 80m rows)