4

I read some documents about Spark memory management.

On this page: What will spark do if I don't have enough memory?. It says:

Spark stores partitions in LRU cache in memory. When cache hits its limit in size, it evicts the entry (i.e. partition) from it. When the partition has “disk” attribute (i.e. your persistence level allows storing partition on disk), it would be written to HDD and the memory consumed by it would be freed, unless you would request it. When you request it, it would be read into the memory, and if there won’t be enough memory some other, older entries from the cache would be evicted. If your partition does not have “disk” attribute, eviction would simply mean destroying the cache entry without writing it to HDD.

Then if the partitions can spill to disk if memory is not enough, how can out of memory issue happen when Spark is running?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Jason Zheng
  • 53
  • 1
  • 4

3 Answers3

5

Spark can evict only the cached RDD blocks. That is if there are RDDs that the application marked for storing in the memory. Thus storage portion of the memory can be cleared but not the execution portion. Spark Memory Management states that

Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations.

And whether they can be evicted or not

Storage may not evict execution due to complexities in implementation.

If the amount of memory available to the JVM is smaller than the required execution memory, OOM is bound to happen.

mkhan
  • 621
  • 4
  • 10
  • 2
    >>"Thus storage portion of the memory can be cleared but not the execution portion" I politely challenge this statement, the document you referred says storage portion can't evict execution portion rather it will evict storage portion on LRU basis. But if all the executor memory is occupied by execution data then on further need execution data can be spill to disk. As per https://www.youtube.com/watch?v=dPHrykZL8Cg (at 7:11), execution data can also be spill to disk. – Vikash Pareek Jun 11 '20 at 17:19
  • @VikashPareek what if the executor is dealing with a shuffle spill and the K-V pair currently we are dealing with is huge in size before the executor can even spill to disk? This could cause OOM I believe? I am also looking for answers on this question maybe share your views. – void Oct 29 '20 at 16:54
2
There could be different explanation for this issue.
1. User Memory
While storage memory and execution memory can spill to the disk, User memory can not.
This means if user memory exceeds the limit it throws OOM Error.

User Memory:

25% of allocated executor memory. This section is used to store all user-defined data objects (eg., Hashmap, UDFs from user, etc) that are needed for RDD conversion operations. User memory is managed by Spark.

  1. YARN Memory Overhead
    If you are using spark on yarn you should configure

spark.yarn.executor.memoryOverhead

YARN memory Overhead is used to store spark internal objects or language-specific objects, thread stacks, NIO buffers.

  1. Driver Memory
    When you run rdd.collect(), data from multiple executors will come to driver. Driver will merge it into a single object which may be too big to fit in driver’s memory
1

Let's understand this with one example: consider a CSV file with 10GB and executor memory as 1 GB. And HDFS block size as 128 MB

Case 1: 10GB file with each record (i.e row / line ) 1 MB we have approx. 10K records.

There would be approx 80 blocks. These blocks would be internally separated by records. Meaning a single record will not span across two blocks.

Now, in this case, the file is read in 128MB part which is obviously lesser than 1GB executor space; also InputSplit does not have to access multiple blocks.

Hence the file will be processed smoothly without OOM.

Case 2: 10GB file with each record 1.5 GB we have approx 6-7 records.

Again there would be approx 80 blocks. But these blocks are linked as the record in one block is spilling to another block. So to read 1 record you have to access 12 blocks simultaneously.

Now when the spark is reading the first block of 128 MB it sees(InputSplit) that the record is not finished, it has to read the second blocks as well and it continues till the 8th block(1024MB). Now when it tries to read the 9th block it cannot fit into 1 GB memory and hence it gives OOM exception.

Now can your single record be 1.5 GB ? best example is XML files or gz compressed csv files which are not splittable.

To conclude - its not only the file size or executor size, but also record size and is file format splittable or not has to be taken into consideration!

Abhishek
  • 51
  • 3