67

I'm running a 5 node Spark cluster on AWS EMR each sized m3.xlarge (1 master 4 slaves). I successfully ran through a 146Mb bzip2 compressed CSV file and ended up with a perfectly aggregated result.

Now I'm trying to process a ~5GB bzip2 CSV file on this cluster but I'm receiving this error:

16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I'm confused as to why I'm getting a ~10.5GB memory limit on a ~75GB cluster (15GB per 3m.xlarge instance)...

Here is my EMR config:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

From what I've read, setting the maximizeResourceAllocation property should tell EMR to configure Spark to fully utilize all resources available on the cluster. Ie, I should have ~75GB of memory available... So why am I getting a ~10.5GB memory limit error? Here is the code I'm running:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

Basically, nothing more than windowing and a groupBy to aggregate the data.

It starts with a few of those errors, and towards halting increases in the amount of the same error.

I've tried running spark-submit with --conf spark.yarn.executor.memoryOverhead but that doesn't seem to solve the problem either.

Community
  • 1
  • 1
lauri108
  • 1,381
  • 1
  • 13
  • 22
  • Would you care posting the whole error log ? Your description doesn't make much sense. – eliasah Nov 24 '16 at 08:44
  • Hi @eliasah, please check http://pastebin.com/rPAXySWm for a full error log. – lauri108 Nov 24 '16 at 08:50
  • what is the value of `spark.executor.memory` ? – mrsrinivas Nov 24 '16 at 09:20
  • @mrsrinivas, I haven't set that value at all. Also, can't find that in http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-configure-apps.html – lauri108 Nov 24 '16 at 09:28
  • Ah okay, @mrsrinivas I found it in the [Spark docs](http://spark.apache.org/docs/latest/configuration.html) instead. The default seems to be 1Gb – lauri108 Nov 24 '16 at 09:30
  • you can pass the property in spark submit (eg: `--conf spark.executor.memory=20g` ) – mrsrinivas Nov 24 '16 at 09:45
  • m3.xlarge has 15GB memory, so you cannot set `spark.executor.memory=20G`. Some memory has to be reserved for the OS (around 1GB) and you probably also need to increase the memoryOverhead a bit to say 2GB. That will leave you with 12GB for executor-memory. – Glennie Helles Sindholt Nov 24 '16 at 10:20
  • Oh, and the 10.4GB is per executor. If you have 4 slaves, than you have 4*10.4GB = 40GB memory in use. You will never get "full" utilization of your cluster - some space has to be left for the OS and some space is used internally by Spark. As a general rule of thumb you can assign some 75-80% of available memory to Spark. – Glennie Helles Sindholt Nov 24 '16 at 10:24
  • 20g is just example. We should be careful we `groupBy` applied on huge dataset – mrsrinivas Nov 25 '16 at 04:43
  • @mrsrinivas, I thought groupBy was just an issue with RDDs. Does the DataFrame API suffer from the same problem? Would you recommend trying a windowing approach to aggregate the data, rather than a groupBy? – lauri108 Nov 25 '16 at 16:17
  • @mrsrinivas and @glennie-helles-sindholt, I've re-run with 1+4 m3.xlarge machines: `spark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35` Getting this error immediately upon Step start: `Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'` – lauri108 Nov 25 '16 at 16:23

5 Answers5

73

I feel your pain..

We had similar issues of running out of memory with Spark on YARN. We have five 64GB, 16 core VMs and regardless of what we set spark.yarn.executor.memoryOverhead to, we just couldn't get enough memory for these tasks -- they would eventually die no matter how much memory we would give them. And this as a relatively straight-forward Spark application that was causing this to happen.

We figured out that the physical memory usage was quite low on the VMs but the virtual memory usage was extremely high (despite the logs complaining about physical memory). We set yarn.nodemanager.vmem-check-enabled in yarn-site.xml to false and our containers were no longer killed, and the application appeared to work as expected.

Doing more research, I found the answer to why this happens here: http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

Since on Centos/RHEL 6 there are aggressive allocation of virtual memory due to OS behavior, you should disable virtual memory checker or increase yarn.nodemanager.vmem-pmem-ratio to a relatively larger value.

That page had a link to a very useful page from IBM: https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

In summary, glibc > 2.10 changed its memory allocation. And although huge amounts of virtual memory being allocated isn't the end of the world, it doesn't work with the default settings of YARN.

Instead of setting yarn.nodemanager.vmem-check-enabled to false, you could also play with setting the MALLOC_ARENA_MAX environment variable to a low number in hadoop-env.sh. This bug report has helpful information about that: https://issues.apache.org/jira/browse/HADOOP-7154

I recommend reading through both pages -- the information is very handy.

Duff
  • 1,389
  • 1
  • 11
  • 21
  • 2
    property is `yarn.nodemanager.vmem-check-enabled`, note hyphens – Joffer May 09 '17 at 15:15
  • I didn't find this property in yarn-site.xml. I am using Spark with Amazon EMR – lfvv Aug 30 '17 at 19:32
  • 1
    @lfvv you may need to manually add it. You can find various other settings here: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml – Duff Aug 30 '17 at 20:59
  • 3
    I don't think telling the resource manager to no longer manage its resources properly is a nice solution. – Clemens Valiente Jan 19 '18 at 11:17
  • @ClemensValiente I feel like you're right... In which case maybe tweaking MALLOC_ARENA_MAX is the better way to go. I haven't experimented with that myself, however. – Duff Jan 19 '18 at 17:57
  • @ClemensValiente Mind you, this isn't so much telling the resource manager not to manage its resources, but to not take into account the overly aggressive allocation of virtual (not physical) memory. The resource manager still manages every other aspect as-is. – Duff Jan 19 '18 at 18:04
  • Not really sure how disabling virtual memory checking would help in this situation, since the log clearly shows the container was killed by a physical memory check. – Joel Croteau Apr 28 '19 at 00:58
  • @JoelCroteau, it's been two years since I wrote this answer, but I remember being very annoyed to find that the log message was so misleading. I kept going back to look at why it kept reporting that "x of x physical memory used" but "x" was always far less memory than I had allocated. So... Although this solution worked for me, I really can't blame you for your observation. :) – Duff Apr 29 '19 at 03:37
20

If you're not using spark-submit, and you're looking for another way to specify the yarn.nodemanager.vmem-check-enabled parameter mentioned by Duff, here are 2 other ways:

Method 2

If you're using a JSON Configuration file (that you pass to the AWS CLI or to your boto3 script), you'll have to add the following configuration:

[{
"Classification": "yarn-site", 
  "Properties": {
    "yarn.nodemanager.vmem-check-enabled": "false"
   }
}]

Method 3

If you use the EMR console, add the following configuration:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]
louis_guitton
  • 5,105
  • 1
  • 31
  • 33
13

See,

I had the same problem in a huge cluster that I'm working now. The problem will not be solved to adding memory to the worker. Sometimes in process aggregation spark will use more memory than it has and the spark jobs will start to use off-heap memory.

One simple example is:

If you have a dataset that you need to reduceByKey it will, sometimes, agregate more data in one worker than other, and if this data exeeds the memory of one worker you get that error message.

Adding the option spark.yarn.executor.memoryOverhead will help you if you set for 50% of the memory used for the worker (just for test, and see if it works, you can add less with more tests).

But you need to understand how Spark works with the Memory Allocation in the cluster:

  1. The more common way Spark uses 75% of the machine memory. The rest goes to SO.
  2. Spark has two types of memory during the execution. One part is for execution and the other is the storage. Execution is used for Shuffles, Joins, Aggregations and Etc. The storage is used for caching and propagating data accross the cluster.

One good thing about memory allocation, if you are not using cache in your execution you can set the spark to use that sotorage space to work with execution to avoid in part the OOM error. As you can see this in documentation of spark:

This design ensures several desirable properties. First, applications that do not use caching can use the entire space for execution, obviating unnecessary disk spills. Second, applications that do use caching can reserve a minimum storage space (R) where their data blocks are immune to being evicted. Lastly, this approach provides reasonable out-of-the-box performance for a variety of workloads without requiring user expertise of how memory is divided internally.

But how can we use that?

You can change some configurations, Add the MemoryOverhead configuration to your job call but, consider add this too: spark.memory.fraction change for 0.8 or 0.85 and reduce the spark.memory.storageFraction to 0.35 or 0.2.

Other configurations can help, but it need to check in your case. Se all these configuration here.

Now, what helps in My case.

I have a cluster with 2.5K workers and 2.5TB of RAM. And we were facing OOM error like yours. We just increase the spark.yarn.executor.memoryOverhead to 2048. And we enable the dynamic allocation. And when we call the job, we don't set the memory for the workers, we leave that for the Spark to decide. We just set the Overhead.

But for some tests for my small cluster, changing the size of execution and storage memory. That solved the problem.

Thiago Baldim
  • 7,362
  • 3
  • 29
  • 51
  • 2
    I've re-run with these params, using a cluster of 1+4 m3.xlarge machines: `spark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35` and getting this error immediately upon Step start: `Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'` – lauri108 Nov 25 '16 at 16:18
  • 1
    The message is tell you exactly what you need to do: your `spark.executor.memory`+`spark.yarn.executor.memoryOverhead` must be less than `yarn.nodemanager.resource.memory-mb`. I would suggest you decrease `memoryOverhead`, for a 15g node, it can be 1g (`1024 mb`), and I would increase your `yarn.nodemanager.resource.memory-mb` to `12288 mb`, and decrease your `spark.executor.memory` to `11264 mb`. IFthat doesn't work, then increase `yarn.nodemanager.resource.memory-mb` to `13312 mb`, and tell me what your `yarn.scheduler.maximum-allocation-mb` is. – makansij May 14 '17 at 18:58
  • This is a better, less intrusive option than the accepted answer. If you have other applications already running on the YARN instance Spark is executing through, changing the yarn-site.xml can be really risky and have broad consequences. – josiah Aug 06 '18 at 19:33
  • 1
    I can't say about old versions but Spark 2.3.1 doesn't use off-heap when it has a lack of heap. It splits memory into 2 pools: execution and storage. When one of the pools overflows it bites off of another pool. When both pools are full, task is blocked until free memory is available. – Avseiytsev Dmitriy Sep 05 '18 at 07:56
  • It seems like a pretty misleading error message. Because it seems like the problem is not that memoryOverhead is low, but rather overall executor memory. So the better solution/error message would probably be to increase executor memory? – Ted Jul 05 '19 at 01:16
  • Is increasing "spark.memory.fraction" and reducing "spark.memory.storageFraction" safe? Spark Configuration does not recommend changing these from the default values. – bhavi Jul 10 '20 at 10:13
6

Try repartition. It works in my case.

The dataframe was not so big at the very beginning when it was loaded with write.csv(). The data file amounted to be 10 MB or so, as may required say totally several 100 MB memory for each processing task in executor. I checked the number of partitions to be 2 at the time. Then it grew like a snowball during the following operations joining with other tables, adding new columns. And then I ran into the memory exceeding limits issue at a certain step. I checked the number of partitions, it was still 2, derived from the original data frame I guess. So I tried to repartition it at the very beginning, and there was no problem anymore.

I have not read many materials about Spark and YARN yet. What I do know is that there are executors in nodes. An executor could handle many tasks depending on the resources. My guess is one partition would be atomically mapped to one task. And its volume determines the resource usage. Spark could not slice it if one partition grows too big.

A reasonable strategy is to determine the nodes and container memory first, either 10GB or 5GB. Ideally, both could serve any data processing job, just a matter of time. Given the 5GB memory setting, the reasonable row for one partition you find, say is 1000 after testing (it won't fail any steps during the processing), we could do it as the following pseudo code:

RWS_PER_PARTITION = 1000
input_df = spark.write.csv("file_uri", *other_args)
total_rows = input_df.count()
original_num_partitions = input_df.getNumPartitions()
numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions)
input_df = input_df.repartition(numPartitions)

Hope it helps!

Mario Becerra
  • 514
  • 1
  • 6
  • 16
韦光正
  • 486
  • 5
  • 4
1

I had the same issue on small cluster running relatively small job on spark 2.3.1. The job reads parquet file, removes duplicates using groupBy/agg/first then sorts and writes new parquet. It processed 51 GB of parquet files on 4 nodes (4 vcores, 32Gb RAM).

The job was constantly failing on aggregation stage. I wrote bash script watch executors memory usage and found out that in the middle of the stage one random executor starts taking double memory for a few seconds. When I correlated time of this moment with GC logs it matched with full GC that empties big amount of memory.

At last I understood that the problem is related somehow to GC. ParallelGC and G1 causes this issue constantly but ConcMarkSweepGC improves the situation. The issue appears only with small amount of partitions. I ran the job on EMR where OpenJDK 64-Bit (build 25.171-b10) was installed. I don't know the root cause of the issue, it could be related to JVM or operating system. But it is definitely not related to heap or off-heap usage in my case.

UPDATE1

Tried Oracle HotSpot, the issue is reproduced.

Avseiytsev Dmitriy
  • 1,160
  • 9
  • 19