27

After reading through the documentation I do not understand how does Spark running on YARN account for Python memory consumption.

Does it count towards spark.executor.memory, spark.executor.memoryOverhead or where?

In particular I have a PySpark application with spark.executor.memory=25G, spark.executor.cores=4 and I encounter frequent Container killed by YARN for exceeding memory limits. errors when running a map on an RDD. It operates on a fairly large amount of complex Python objects so it is expected to take up some non-trivial amount of memory but not 25GB. How should I configure the different memory variables for use with heavy Python code?

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
domkck
  • 1,146
  • 1
  • 9
  • 19

1 Answers1

15

I'd try to increase memory to spark.python.worker.memory default (512m) because of heavy Python code and this property value does not count in spark.executor.memory.

Amount of memory to use per python worker process during aggregation, in the same format as JVM memory strings (e.g. 512m, 2g). If the memory used during aggregation goes above this amount, it will spill the data into disks. link

ExecutorMemoryOverhead calculation in Spark:

MEMORY_OVERHEAD_FRACTION = 0.10 
MEMORY_OVERHEAD_MINIMUM = 384 
val executorMemoryOverhead = 
  max(MEMORY_OVERHEAD_FRACTION * ${spark.executor.memory}, MEMORY_OVERHEAD_MINIMUM))

The property is spark.{yarn|mesos}.executor.memoryOverhead for YARN and Mesos.

YARN kills the processes which are taking more memory than they requested which is sum of executorMemoryOverhead and executorMemory.

In given image python processes in worker uses spark.python.worker.memory, then spark.yarn.executor.memoryOverhead + spark.executor.memory is specific JVM.

PySpark Internals Image credits

Additional resource Apache mailing thread

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • 2
    @mr-srinivas `spark.python.worker.memory` talks about memory used during *aggregation*. What about when not aggregating anything, for example during a simple map stage? – domkck Oct 26 '16 at 16:06
  • @domkck: I/O in Spark will be performed in by JVM only(refer image) and data will be moved will be moved to Python processes via the pipe for aggregations. Please check the updated answer and I was not sure how I missed your question till now. – mrsrinivas Feb 07 '17 at 06:34
  • @mrsrinivas I use pyspark to extract features, and use the combineByKey operator, and use the default `spark.python.worker.memory` setting, but the python worker process use about 10g memory, and yarn killed the container. Why the `spark.python.worker.memory` setting not work? Can you show me some reference for profiling the python worker memory? Thanks so much – Adooo Aug 25 '18 at 01:24
  • 3
    The answer still seems a bit unclear. If `spark.python.worker.memory` is not accounted for (i.e. a subset of) `spark.executor.memory`, is it part of `spark.yarn.executor.memoryOverhead` then? `spark.python.worker.memory` is clearly off-heap and YARN kills a container if `spark.executor.memory` + `spark.yarn.executor.memoryOverhead` > `yarn.nodemanager.resource.memory-mb`. This seems to suggest that `spark.python.worker.memory` should be accounted for in `spark.yarn.executor.memoryOverhead`. Can you confirm? – Martin Studer Nov 15 '18 at 13:45