0

I'm using Machine Learning Workspace in Cloudera Data Platform (CDP). I created a session with 4vCPU/16 GiB Memory and enabled Spark 3.2.0.

I'm using spark to load data of one month (the whole month data size is around 12 GB) and do some transformation, then write the data as parquet files on AWS S3.

My Spark session configuration looks like this:

SparkSession
         .builder
         .appName(appName)
         .config("spark.driver.memory", "8G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.minExecutors", "4")
         .config("spark.dynamicAllocation.maxExecutors", "20")
         .config("spark.executor.cores", "4")
         .config("spark.executor.memory", "8G")
         .config("spark.sql.shuffle.partitions", 500)
......

Before the data are written to parquet files, they are repartitioned: As @Koedlt suggested, I corrected the "salt" column.

old:

df.withColumn("salt", lit(random.randrange(100)))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)

new:

df.withColumn("salt", floor(rand() * 100))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)

The data transformation with spark run sucessfully. But the spark job failed always in the last step when writing data to parquet files.

Below is the example of the error message:

23/01/15 21:10:59 678 ERROR TaskSchedulerImpl: Lost executor 2 on 100.100.18.155: 
The executor with id 2 exited with exit code -1(unexpected).
The API gave the following brief reason: Evicted
The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 10Gi. 

I think there is no problem with my spark configuration. The problem is the configuration of kubenete ephemeral local storage size limitation, which I do not have the right to change it.

Can some one explain why this happened and what is is possbile solution for it?

Ryan
  • 33
  • 4

1 Answers1

0

I see an issue in this line:

df.withColumn("salt", lit(random.randrange(100)))

What happens when you do this, is random.randrange(100) is evaluated once. Then you create a literal column with that value repeated constantly. So you're essentially not salting at all, keeping your original data skew problems. These are possibly at the root of your ephemeral local storage issue.

You need to use the pyspark.sql.functions.rand function to properly create random columns and to salt.

Let's show a small example. With the following simple input data:

df = spark.createDataFrame(
    [
        (1, 1, "ABC"),
        (1, 2, "BCD"),
        (1, 3, "DEF"),
        (2, 1, "EFG"),
        (2, 2, "GHI"),
        (2, 3, "HIJ"),
        (3, 1, "EFG"),
        (3, 2, "BCD"),
        (3, 3, "HIJ"),
    ],
    ["KEY", "ORDER", "RESP"]
)

Doing what you were doing:

df.withColumn("salt", lit(random.randrange(100))).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
|  1|    1| ABC|  86|
|  1|    2| BCD|  86|
|  1|    3| DEF|  86|
|  2|    1| EFG|  86|
|  2|    2| GHI|  86|
|  2|    3| HIJ|  86|
|  3|    1| EFG|  86|
|  3|    2| BCD|  86|
|  3|    3| HIJ|  86|
+---+-----+----+----+

Whereas using the proper pyspark functions:

df.withColumn("salt", floor(rand() * 100)).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
|  1|    1| ABC|  66|
|  1|    2| BCD|  40|
|  1|    3| DEF|  99|
|  2|    1| EFG|  55|
|  2|    2| GHI|  23|
|  2|    3| HIJ|  41|
|  3|    1| EFG|  61|
|  3|    2| BCD|   0|
|  3|    3| HIJ|  33|
+---+-----+----+----+
Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • @Koedit: I corrected wrong salt column with your suggestion. Unfortunately the issue of OOM is still there. I think the problem is not with spark, but the kurbenete Pod ephemeral local storage. It seems that during writing to parquet files, a lot of ephemral storage are occupied and exceed the limitition, which is defined in the ephemeral local storage configuraion. – Ryan Jan 16 '23 at 15:13
  • Hmmm are you sure that your `date_year` and `date_month` columns are not skewed? What happens if you remove this line: `.drop("salt").write.partitionBy("date_year", "date_month")`? – Koedlt Jan 16 '23 at 15:26
  • hi Koedit, I'm sure that the data_year and date_month columns are not skewed. I added the salt column in order to get more partitions. Because I thought partition with two many data leads to the OOM problem. I tried without repartition, or with repartiton(500), both did not work. I tried first repartition and count the dataframe, then write to parquet files, the repartiton with count was sucessfully, and error happeded again in the step of writing to parquet files. – Ryan Jan 16 '23 at 15:36
  • Ok interesting! That makes me think that the ephemeral storage issue is the actual size of your parquet that is the issue. Where are you writing your parquet files to? – Koedlt Jan 16 '23 at 16:02
  • The parquets are writing to AWS s3 folder s3a://xxxx. – Ryan Jan 16 '23 at 16:18
  • What happens when you do `df.write.mode("overwrite").parquet(s3a://xxxx)` without any of the repartitioning/salting/dropping columns? – Koedlt Jan 16 '23 at 16:21
  • At the begining I did like this, without any repartition/salting/dropping columns. Then I got OOM errors, I tried to fix it with repartition/salt, nothing helped. – Ryan Jan 16 '23 at 16:33
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/251180/discussion-between-koedlt-and-ryan). – Koedlt Jan 16 '23 at 16:40