8

I submit my code to a spark stand alone cluster. Submit command is like below:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./myCode.py 1>a.log 2>b.log &

I specify the executor use 4G memory in above command. But use the top command to monitor the executor process, I notice the memory usage keeps growing. Now the top Command output is below:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root      20   0 20.223g 5.790g  23856 S  61.5 37.3  20:49.36 java       

My total memory is 16G so 37.3% is already bigger than the 4GB I specified. And it is still growing.

Use the ps command , you can know it is the executor process.

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?        Sl     1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ?        Sl     6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ?        Sl    10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ?        Sl    21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://Worker@10.79.148.184:52660

Below are the code. It is very simple so I do not think there is memory leak

if __name__ == "__main__":

    dataDirectory = '/stream/raw'

    sc = SparkContext(appName="Netflow")
    ssc = StreamingContext(sc, 20)

    # Read CSV File
    lines = ssc.textFileStream(dataDirectory)

    lines.foreachRDD(process)

    ssc.start()
    ssc.awaitTermination()

The code for process function is below. Please note that I am using HiveContext not SqlContext here. Because SqlContext do not support window function

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def process(time, rdd):

    if rdd.isEmpty():
        return sc.emptyRDD()

    sqlContext = getSqlContextInstance(rdd.context)

    # Convert CSV File to Dataframe
    parts = rdd.map(lambda l: l.split(","))
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
    dataframe = sqlContext.createDataFrame(rowRdd)

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")

    ret.show()
    dataframe.show()

Actually I found below code will cause the problem:

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
    ret.show()

Because If I remove these 5 line. The code can run all night without showing memory increase. But adding them will cause the memory usage of executor grow to a very high number.

Basically the above code is just some window + grouby in SparkSQL. So is this a bug?

Kramer Li
  • 2,284
  • 5
  • 27
  • 55

2 Answers2

3

Disclaimer: this answer isn't based on debugging, but more on observations and the documentation Apache Spark provides

I don't believe that this is a bug to begin with!

Looking at your configurations, we can see that you are focusing mostly on the executor tuning, which isn't wrong, but you are forgetting the driver part of the equation.

Looking at the spark cluster overview from Apache Spark documentaion

enter image description here

As you can see, each worker has an executor, however, in your case, the worker node is the same as the driver node! Which frankly is the case when you run locally or on a standalone cluster in a single node.

Further, the driver takes 1G of memory by default unless tuned using spark.driver.memory flag. Furthermore, you should not forget about the heap usage from the JVM itself, and the Web UI that's been taken care of by the driver too AFAIK!

When you delete the lines of code you mentioned, your code is left without actions as map function is just a transformation, hence, there will be no execution, and therefore, you don't see memory increase at all!

Same applies on groupBy as it is just a transformation that will not be executed unless an action is being called which in your case is agg and show further down the stream!

That said, try to minimize your driver memory and the overall number of cores in spark which is defined by spark.cores.max if you want to control the number of cores on this process, then cascade down to the executors. Moreover, I would add spark.python.profile.dump to your list of configuration so you can see a profile for your spark job execution, which can help you more with understanding the case, and to tune your cluster more to your needs.

mamdouh alramadan
  • 8,349
  • 6
  • 36
  • 53
  • 1
    Hi, Thanks for the answer. But 1. I still have a dataframe.show() after remove those lines. So there still is a action. 2. In my case, the stream calculation can run for a few hours. Which means thousands of loops(the interval is 20 second). During this time the memory usage of executor keeps growing. So I do not know what your suggested solution is. Minimize my driver memory? Why? – Kramer Li May 23 '16 at 03:19
  • I didn't realize that the show is not part of the deleted code. As for the memory, I'm building based on trying to minimize it and see if it will overflow! – mamdouh alramadan May 23 '16 at 03:41
0

As I can see in your 5 lines, maybe the groupBy is the issue , would you try with reduceBy, and see how it performs.

See here and here.

Community
  • 1
  • 1
RoyaumeIX
  • 1,947
  • 4
  • 13
  • 37
  • thanks for the info. But I`m expecting to know if it is a bug or I am not using it in the right way. – Kramer Li May 11 '16 at 08:10
  • @Tristan It is not the same groupBy as on RDD, See http://stackoverflow.com/q/32902982/1560062 – zero323 May 11 '16 at 10:27
  • I assume this csv file is stored on HDFS. what is its size? how much does it grows / changes and in what frequency. what I am trying to understand is how much data to you need to process at each batch interval and what is that interval (1 sec by default)? – z-star May 16 '16 at 07:24
  • @z-star Yes. CSV file is on hdfs.The size is a few KB (less than 1MB). The batch interval is 20s. The size of data under each interval will not change. I do not see a increase on the total delay time or process time so I do not think the workload is the problem – Kramer Li May 16 '16 at 08:17