0

I want to process several idependent csv files of similar sizes (100 MB) in parallel with PySpark. I'm running PySpark on a single machine: spark.driver.memory 20g spark.executor.memory 2g local[1]

File content: type (has the same value within each csv), timestamp, price

First I tested it on one csv (note I used 35 different window functions):

    logData = spark.read.csv("TypeA.csv", header=False,schema=schema)
    // Compute moving avg. I used 35 different moving averages.
    w = (Window.partitionBy("type").orderBy(f.col("timestamp").cast("long")).rangeBetween(-24*7*3600 * i, 0))
    logData = logData.withColumn("moving_avg", f.avg("price").over(w))
    // Some other simple operations... No Agg, no sort
    logData.write.parquet("res.pr")

This works great. However, i had two issues with scaling this job:

  1. I tried to increase number of window functions to 50 the job OOMs. Not sure why PySpark doesn't spill to disk in this case, since window functions are independent of each other
  2. I tried to run the job for 2 CSV files, it also OOMs. It is also not clear why it is not spilled to disk, since the window functions are basically partitioned by CSV files, so they are independent.

The question is why PySpark doesn't spill to disk in these two cases to prevent OOM, or how can I hint the Spark to do it?

Le_Coeur
  • 1,521
  • 5
  • 28
  • 44
  • 1
    How much RAM does the computer have in total? You probably need to lower the memory allocated to Spark. – Lars Skaug Aug 23 '20 at 19:47
  • 32g RAM in total – Le_Coeur Aug 23 '20 at 19:57
  • 1
    Have a look at this post. Make sure you set the memory before you start your application. spark.executor.memory does not apply. https://stackoverflow.com/questions/26562033/how-to-set-apache-spark-executor-memory – Lars Skaug Aug 23 '20 at 20:03
  • I don't have a problem with setting the driver memory - it works. My issue is that PySpark goes OOM and don't spill data to the disk instead, since it clearly doesn't need to hold everything in memory for processing, as computations are independent. – Le_Coeur Aug 23 '20 at 22:52
  • You need 60GB to process all 600 files of 100MB, so you cannot do this with just a local master of only 32G and reading all files at once. You will want to use YARN/k8s to distribute the work and HDFS/S3 to distribute the files. Or you can upload the CSV files to a database, then write SQL from there. – OneCricketeer Feb 18 '22 at 17:53
  • This is not about just reading files, I updated the question to illustrate it. – Le_Coeur Feb 20 '22 at 19:56

2 Answers2

0

If your machine cannot run all of these you can do that in sequence and write the data of each bulk of files before loading the next bulk.

I'm not sure if this is what you mean but you can try hint spark to write some of the data to your disk instead of keep it on RAM with:

df.persist(StorageLevel.MEMORY_AND_DISK)

Update if it helps

ste1213
  • 11
  • 2
0

In theory, you could process all these 600 files in one single machine. Spark should spill to disk when meemory is not enough. But there're some points to consider:

  1. As the logic involves window agg, which results in heavy shuffle operation. You need to check whether OOM happened on map or reduce phase. Map phase process each partition of file, then write shuffle output into some file. Then reduce phase need to fetch all these shuffle output from all map tasks. It's obvious that in your case you can't hold all map tasks running.
  2. So it's highly likely that OOM happened on map phase. If this is the case, it means the memory per core can't process one signle partition of file. Please be aware that spark will do rough estimation of memory usage, then do spill if it thinks it should be. As the estatimation is not accurate, so it's still possible OOM. You can tune partition size by below configs:
spark.sql.files.maxPartitionBytes (default 128MB)

Usaually, 128M input needs 2GB heap with total 4G executor memory as

executor JVM heap execution memory (0.5 of total executor memory) =
(total executor memory - executor.memoryOverhead (default 0.1)) * spark.memory.storageFraction (0.6) 

You can post all your configs in Spark UI for further investigation.

Warren Zhu
  • 1,355
  • 11
  • 12
  • After some debugging I see 2 issues: 1) In my setup I have 50 window functions (different size of moving average) over a price column. If I run it over 1 CSV of 100mb with 20GB driver memory it OOMS. If I reduce number of window functions to 35 the jobs succeeds. So why PySpark doesn't spill to disk, since the window functions are independent of each other 2) So now if I use 2 CSV of 100 mb with 20GB driver memory and 35 window functions. The job OOM, now it is because of 2 files. However, window functions are partitioned by the name of the file, which should make them independent as well – Le_Coeur Feb 20 '22 at 19:04
  • So what I ended up doing is writing the dataframe after every window function to a parquet file and then immediately reading it back. In this case the memory footprint didn't go over 4GB! This sounds more like a hack and I'm still surprised that there is no way to hint Spark to do similar thing automatically. – Le_Coeur Feb 21 '22 at 03:26