0

Premise: I'm not in control of my cluster and am working on the premise that the problem is in my code, not the setup my school is using. Maybe that is wrong, but it's an assumption that underlies this question.

Why does write.csv() cause my pyspark/slurm job to excede memory limits, when many previous operations on larger versions of the data have succeeded, and what can I do about it?

The error I'm getting is (many iterations of...):

18/06/02 16:13:41 ERROR YarnScheduler: Lost executor 21 on server.name.edu: Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I know I can change the memory limits, but I've already increased it several times with no change in outcome, and am pretty convinced I shouldn't be using anywhere near this amount of memory anyway. For reference my slurm call is:

spark-submit \
    --master yarn \
    --num-executors 100 \
   --executor-memory 6g \
   3main.py

So what exactly am I trying to write? Well I've read in a 39G .bz2 json, to an RDD,

allposts = ss.read.json(filename)

filtered a bunch, counted words, grouped the RDD, done some calculations, filtered more, and in the end I have these two print statements to give an idea of what's left...

abscounts = calculatePosts2(postRDD, sc, spark)
abscounts.printSchema()
print(abscounts.count())

These print statements work (output below). The resulting RDD is about 60 cols by 2000 rows+/-. Those 60 columns include 1 string the length of a subreddit name, and 59 doubles.

root
 |-- subreddit: string (nullable = true)
 |-- count(1): long (nullable = false)
 |-- sum(wordcount): long (nullable = true)
 |-- ingestfreq: double (nullable = true)
 |-- causefreq: double (nullable = true)
 |-- insightfreq: double (nullable = true)
 |-- cogmechfreq: double (nullable = true)
 |-- sadfreq: double (nullable = true)
 |-- inhibfreq: double (nullable = true)
 |-- certainfreq: double (nullable = true)
 |-- tentatfreq: double (nullable = true)
 |-- discrepfreq: double (nullable = true)
 |-- spacefreq: double (nullable = true)
 |-- timefreq: double (nullable = true)
 |-- exclfreq: double (nullable = true)
 |-- inclfreq: double (nullable = true)
 |-- relativfreq: double (nullable = true)
 |-- motionfreq: double (nullable = true)
 |-- quantfreq: double (nullable = true)
 |-- numberfreq: double (nullable = true)
 |-- swearfreq: double (nullable = true)
 |-- functfreq: double (nullable = true)
 |-- absolutistfreq: double (nullable = true)
 |-- ppronfreq: double (nullable = true)
 |-- pronounfreq: double (nullable = true)
 |-- wefreq: double (nullable = true)
 |-- ifreq: double (nullable = true)
 |-- shehefreq: double (nullable = true)
 |-- youfreq: double (nullable = true)
 |-- ipronfreq: double (nullable = true)
 |-- theyfreq: double (nullable = true)
 |-- deathfreq: double (nullable = true)
 |-- biofreq: double (nullable = true)
 |-- bodyfreq: double (nullable = true)
 |-- hearfreq: double (nullable = true)
 |-- feelfreq: double (nullable = true)
 |-- perceptfreq: double (nullable = true)
 |-- seefreq: double (nullable = true)
 |-- fillerfreq: double (nullable = true)
 |-- healthfreq: double (nullable = true)
 |-- sexualfreq: double (nullable = true)
 |-- socialfreq: double (nullable = true)
 |-- familyfreq: double (nullable = true)
 |-- friendfreq: double (nullable = true)
 |-- humansfreq: double (nullable = true)
 |-- affectfreq: double (nullable = true)
 |-- posemofreq: double (nullable = true)
 |-- negemofreq: double (nullable = true)
 |-- anxfreq: double (nullable = true)
 |-- angerfreq: double (nullable = true)
 |-- assentfreq: double (nullable = true)
 |-- nonflfreq: double (nullable = true)
 |-- verbfreq: double (nullable = true)
 |-- articlefreq: double (nullable = true)
 |-- pastfreq: double (nullable = true)
 |-- auxverbfreq: double (nullable = true)
 |-- futurefreq: double (nullable = true)
 |-- presentfreq: double (nullable = true)
 |-- prepsfreq: double (nullable = true)
 |-- adverbfreq: double (nullable = true)
 |-- negatefreq: double (nullable = true)
 |-- conjfreq: double (nullable = true)
 |-- homefreq: double (nullable = true)
 |-- leisurefreq: double (nullable = true)
 |-- achievefreq: double (nullable = true)
 |-- workfreq: double (nullable = true)
 |-- religfreq: double (nullable = true)
 |-- moneyfreq: double (nullable = true)

...

2026

After that the only remaining line in my code is:

  abscounts.write.csv('bigoutput.csv', header=True)

And this crashes with memory errors. This absolutely should not take up gigs of space... What am I doing wrong here?

Thanks for your help.

If you're curious/it helps, the entirety of my code is on github

kchalk
  • 53
  • 1
  • 7
  • How many partitions does allposts dataframe have (allposts .rdd.getNumPartitions())? You may need to re-partition it and create more smaller partitions to make sure each partition + overhead fits in memory of executor – Denis Makarenko Jun 03 '18 at 05:07
  • @DenisMakarenko allposts has 318 partitions (before any filtering or processing of the data). abscounts (which is what I'm trying to write) has 200. When you say I may need to repartition-- is this the same advice as user3689574 's answer below? Forgive me, but I'm a bit in over my head on this one and am only almost following much of the responses I'm getting here... – kchalk Jun 03 '18 at 22:19

1 Answers1

0

First of all executor.memoryOverhead is not the same as executor-memory. As you can see here.

With Pyspark, memoryOverhead is important because it controls the extra memory that may be needed by python to perform some actions (See here), in your case collecting and saving a CSV file per partition.

To help python, you also may consider using coalesce before writing.

user3689574
  • 1,596
  • 1
  • 11
  • 20
  • Thank you for your answer. I have to admit that I don't completely follow what's covered in your links, but I get that I do actually need to set memory overhead independently. And presumably coalesce will reduce that overhead somehow? But how many partitions/ how much memory overhead should I need for an RDD of ~60 doubles and one 20 char string in a max of 120k entries? – kchalk Jun 03 '18 at 22:32