3

I have a spark dataframe in Databricks cluster with 5 million rows. And what I want is to cache this spark dataframe and then apply .count() so for the next operations to run extremely fast. I have done it in the past with 20,000 rows and it works. However, in my trial to do this I came into the following paradox:

Dataframe creation

Step 1: Read 8 millions rows from Azure Data Lake storage account

read_avro_data=spark.read.format("avro").load(list_of_paths) #list_of_paths[0]='abfss://storage_container_name@storage_account_name.dfs.core.windows.net/folder_1/folder_2/0/2020/06/02/00/00/27.avro'
avro_decoded=read_avro_data.withColumn('Body_decoded', sql_function.decode(read_avro_data.Body, charset="UTF-8")).select("Body_decoded")
datalake_spark_dataframe=datalake_spark_dataframe.union(avro_decoded.withColumn("Body_decoded", sql_function.from_json("Body_decoded", schema)).select(*['Body_decoded.{}'.format(x) for x in columns_selected]))

datalake_spark_dataframe.printSchema()
"root
 |-- id: string (nullable = true)
 |-- BatteryPercentage: float (nullable = true)
 |-- SensorConnected: integer (nullable = false)
 |-- TemperatureOutside: float (nullable = true)
 |-- ReceivedOn: string (nullable = true)"

datalake_spark_dataframe.rdd.getNumPartitions() # 635 partitions

This dataframe has 8 million rows. With 8 million rows my application runs pretty good, but I wanted to stress test my application in a big-data environment. Because 8 million rows is not Big-Data. Thus I replicated my 8 millions rows Spark Dataframe 287 times ~ 2.2 billion rows. To make the replication I did the following:

Step 2: Replicate the 8 million rows dataframe

datalake_spark_dataframe_new=datalake_spark_dataframe
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=datalake_spark_dataframe_new.union(datalake_spark_dataframe)
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new.rdd.getNumPartitions() #182880

Having the final 2.2 billion rows dataframe, I made a time-window GroupBy of my data, ending up with some millions of rows. I have written approximately that the grouped dataset has 5 million rows in the top of my question.

Step 3: GroupBy the 2.2 billion rows dataframe by a time window of 6 hours & Apply the .cache() and .count()

%sql set spark.sql.shuffle.partitions=100
import pyspark.sql.functions as sql_function
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, DoubleType, ArrayType

datalake_spark_dataframe_downsampled=datalake_spark_dataframe_new.withColumn(timestamp_column, sql_function.to_timestamp(timestamp_column, "yyyy-MM-dd HH:mm"))
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.groupBy("id", sql_function.window("ReceivedOn","{0} minutes".format(time_interval)))\
                                                                         .agg(
                                                                              sql_function.mean("BatteryPercentage").alias("BatteryPercentage"),
                                                                              sql_function.mean("SensorConnected").alias("OuterSensorConnected"),
                                                                              sql_function.mean("TemperatureOutside").alias("averageTemperatureOutside"))

columns_to_drop=['window']
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.drop(*columns_to_drop)

# From 2.2 billion rows down to 5 million rows after the GroupBy...
datalake_spark_dataframe_downsampled.repartition(100)
datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count() # job execution takes for ever

datalake_spark_dataframe_downsampled.rdd.getNumPartitions() #100 after re-partition

Spark UI before showing the .count() enter image description here

Spark UI during the count execution enter image description here

When I apply the following commands to my spark Dataframe it takes more than 3 hours to complete this task, which in the end fails.

I want to add that before and after the re-partitioning, the job had the same behavior in time execution. So, I did the re-partitioning in case the default values were making the job to run very slow. Thus, I was keep adding partitions in case the job would execute faster.

%sql set spark.sql.shuffle.partitions=1000000
datalake_spark_dataframe_downsampled.repartition(1000000)

datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count()

Below is the output of the spark job: enter image description here

The error I get:

enter image description here

My cluster resources:

enter image description here

As you can see it's not a matter of RAM or CPU cores, as I have plenty of them. Why the job splits only to 5 stages even after I apply re-partitioning? and How can I split the jobs so the .cache() and .count() commands run faster based on my 48 vCPU cores?


Screenshots provided per job execution Execution on 80 million rows (8m * 10 iterations = 80m rows)

enter image description here

NikSp
  • 1,262
  • 2
  • 19
  • 42
  • 1
    @Srinivas the data skew may be some possible scenario here, because If in the initial dataset I have an id 500,000 times by multiplying 287 the same values I get 143 millions of the same rows for 1 id...By grouping them maybe this cause high skew. I know that multiplying the same info many times might be a bad practice but it was the quicker way to stress test my app on Big-Data scenarios – NikSp Jun 02 '20 at 08:20
  • @Srinivas based on your answer below, should I compute the number of re-partitions as: 182880/200 = 914.4....thus 50 // 914.4 ~0 partitions? Is this feasible? 182880 is the number of partitions of the 2.2 billion rows df – NikSp Jun 02 '20 at 09:40
  • 1
    My spark df from data lake has 635 partitions with 8 million rows...I posted it earlier....So the number of partitions is: 635/200~3.175, thus 50 cores // 3.175~ 15? – NikSp Jun 02 '20 at 10:03
  • @Srinivas Ok can you also please check my Spark UI during count execution I have just uploaded an image....Why the input 109.8GB and the Shuffle write 3.9GB are changing during the execution? – NikSp Jun 02 '20 at 10:07
  • @Srinivas ok and one more thing that I noticed. Even though from my screen shot you can validate that the job is running it now just stopped and I got an error about the spark.driver.maxResultSize 4Gb was exceeded even though I have already configured this to 15Gb using the command ```spark.conf.set("spark.driver.maxResultSize", "15g")```....for more look the screenshot I have uploaded with the error. – NikSp Jun 02 '20 at 10:36
  • 1
    I see the main problem with this line - for i in range(287): can try to reduce to 10 & see how much it is taking & slowly increase this number based results you got after execution. – Srinivas Jun 02 '20 at 10:40
  • 1
    Please delete old comments else writing comments will be blocked for 24 hours if it grow bigger. – Srinivas Jun 02 '20 at 10:43
  • @Srinivas ok I will try this with the for loop you suggested. But how is this relates to the 4gb maxResultSize?...for a reason cannot take the 15gb change – NikSp Jun 02 '20 at 10:59
  • check this - https://stackoverflow.com/questions/39087859/what-is-spark-driver-maxresultsize – Srinivas Jun 02 '20 at 11:02
  • @Srinivas Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/215160/discussion-between-niksp-and-srinivas). – NikSp Jun 02 '20 at 11:22
  • @Srinivas please join our last discussion room...I have sent you an invite – NikSp Jun 04 '20 at 06:42

2 Answers2

1

I had the similiar issue in the past while iterating through for loop as my iteration is dynamic depending on input combination.

I resolved the performance issue by persisting data (you can try to persist in ADLS2 or if in case On-Prem then HDFS / Hive Tables) on each iteration. In next Iteration again read from that location, union and again overwrite the same location. There is a network lag and not efficient. Still it brought down execution time by 10x.

Possible reason could be Spark Lineage (I believe for every iteration it does all previous iteration again and again). Persisting data with overwrite avoids that. I tried cache() and other options as well but did not help me.

Edited #1 Try something like this

datalake_spark_dataframe_new=datalake_spark_dataframe
datalake_spark_dataframe.write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")
  datalake_spark_dataframe_new.union(datalake_spark_dataframe).write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

Edited #2 This should be more efficient than previous edition,

for i in range(287):
  print(i)
  datalake_spark_dataframe.write.mode("append").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")
  • What I tried so far is the re-partitioning. Indeed, caching does not help. Could you please post a more clear answer of your approach with ADLS2 or in On-Prem then HDFS / Hive Tables, with code samples in PySpark so I can replicate and test it to my application? – NikSp Jun 04 '20 at 05:31
  • Note: All the ADLS_PATH values should be same location in the code snippet. – viswanathanr Jun 04 '20 at 13:25
  • Appreciate your resposne. So, let's say that in the end the final spark df is 2.2 billion rows. Is the execution time of .cache() and .count(), on this 2.2 billion rows datalake_spark_dataframe_new, gonna be much faster? Because this is my question and not how to execute faster the for loop. – NikSp Jun 04 '20 at 13:29
  • 1
    I understand your concern. I am giving you an different thought that if you persist 2.2 billion rows and then do the count to see that is helping or not. I believe your datalake_spark_dataframe_new lineage will actually be executed during your action call of repartition / cache / count. Hence for loop could be your bottle neck. – viswanathanr Jun 04 '20 at 13:56
  • In any case I will try your way today or tomorrow. For a reason this morning my Databricks cluster couldn't work so I am waiting right now to get it started. I will keep you post about the results. – NikSp Jun 04 '20 at 13:59
  • Hey @viswanathanr hope you are doing well. I would like to ask you something about the code you posted (Edition 2)...I have tested your approach not with a for loop but with the overwrite mode ```.mode("overwrite")``` instead of ```.mode("append")```. I wanted to ask you if this approach of saving the spark df and then reading again from abfss is faster than using .cache(). Will the computations on this spark df will be faster without using the .cache() command but rather only saving it and reading it on abfss? – NikSp Jun 18 '20 at 07:39
  • From what I have tested saving the spark df in abfss and then applying .count() indeed run as fast as if the dataframe was cached in spark executor's memory. Although, I need your opinion in case this is the result of lack. – NikSp Jun 18 '20 at 07:53
  • To clarify more my two comments above is if the saving of the Spark Df on ABFSS is much faster than applying .cache on Spark Executors memory... – NikSp Jun 18 '20 at 08:05
  • Yes. For this scenario, persisting will aid the performance. Basically, cache() in loop might cache cumulatively all previous iterations as well and it does not continue from previous cache(). So for cache() in iteration, I saw time only getting increased exponentially. This is based on my observation on similar scenario which I faced. – viswanathanr Jul 13 '20 at 17:12
  • Ok that's a good thing. I have also observed something also interesting. When I applied the saving on ABFSS instead of .cache command, the trigger action that followed, like .collect(), .count(), Spark ML were executed much faster when the dataframe has been saved as parquet format in ABFSS. So apart from the loop advantages of persisting tables on ABFSS there is also a time improvement in spark trigger execution times. – NikSp Jul 13 '20 at 17:57
0

I think you have used very huge shuffle partition number 1000000 that why it is taking more time to complete job.

I will follow below logic to calculate shuffle partition based on data size. for example

Say 5 millions of data is comes around 20 GB of data.

shuffle stage input = 20 GB

so total number of shuffle partitions are 20000MB/200MB = 100,

let assume only 50 cores in cluster in that case shuffle partition value is 50 or 200 cores in cluster in that case shuffle partition value will be 200.

Choosing high value as shuffle partition value there will be lot of shuffling data & hence task will take more time to complete or sometime it might fail.

spark.sql.shuffle.partitions=50 // 50 or 100 for better option.

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Thank you for the answer. Actually that's a good point in calculating the correct number of partitions. But I wanted to add, which I forgot, before and after the re-partitioning the job had the same behavior in time execution. So, I did the re-partitioning in case the default values were making the job to run slow. Thus, I was keep adding partitions in case the job would execute faster – NikSp Jun 02 '20 at 06:19
  • from the screen shot only one stage is running .. check for data skewness ?? & can you post full code how are you creating dataframe ?? – Srinivas Jun 02 '20 at 06:38
  • How can I increase the number of stages running? Can I split the stages into more?, in order to handle faster more data – NikSp Jun 02 '20 at 06:39
  • 1
    also post spark ui screen shot of jobs & stages – Srinivas Jun 02 '20 at 06:40
  • 1
    to check that I want to see how are you creating or reading data into dataframe, can you post full code if possible – Srinivas Jun 02 '20 at 06:42
  • Ok let me update my question passage with the information you request. – NikSp Jun 02 '20 at 06:47