I have the following minimal working example:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np
sc = SparkContext()
sqlContext = SQLContext(sc)
# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)
def toy_example(rdd):
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
# Return a single `0` value
return [[0]]
# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()
When the above is run, the memory of the executor's Python process steadily increases after each iteration suggesting the memory of the previous iteration isn't being released - i.e., a memory leak. This can lead to a job failure if the memory exceeds the executor's memory limit - see below:
Bizarrely any of the following prevents the memory leak:
- Remove the line
data = list(rdd)
- Insert the line
rand_data = list(rand_data.tolist())
afterrand_data = np.random.random(int(1e7))
- Remove the line
int(e)
The above code is a minimal working example of a much larger project which cannot use the above fixes.
Some things to take notice of:
- While the
rdd
data is not used in the function, the line is required to reproduce the leak. In the real world project, therdd
data is used. - The memory leak is likely due to the large Numpy array
rand_data
not being released - You have to do the
int
operation on each element ofrand_data
to reproduce the leak
Question
Can you force the PySpark executor to release the memory of rand_data
by inserting code in the first few lines or last few lines of the toy_example
function?
What has already been attempted
Force garbage collection by inserting at the end of the function:
del data, rand_data
import gc
gc.collect()
Force memory release by inserting at the end or beginning of the function (inspired by a Pandas issue):
from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
Setup, measurement and versions
The following PySpark job was run on a AWS EMR cluster with one m4.xlarge worker node. Numpy had to be pip installed on the worker node via bootstrapping.
The memory of the executor was measured using the following function (printed to the executor's log):
import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Spark submit config:
- spark.executor.instances = 1
- spark.executor.cores = 1
- spark.executor.memory = 6g
- spark.master = yarn
- spark.dynamicAllocation.enabled = false
Versions:
- EMR 5.12.1
- Spark 2.2.1
- Python 2.7.13
- Numpy 1.14.0