38

What will happen for large files in these cases?

1) Spark gets a location from NameNode for data . Will Spark stop in this same time because data size is too long as per information from NameNode?

2) Spark do partition of data as per datanode block size but all data can not be stored into main memory. Here we are not using StorageLevel. So what will happen here?

3) Spark do partition the data, some data will store on main memory once this main memory store's data will process again spark will load other data from disc.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Arpit Rai
  • 391
  • 1
  • 4
  • 5
  • 3
    How Spark handles large datafiles depends on what you are doing with the data after you read it in. If you call `cache` you will get an OOM, but it you are just doing a number of operations, Spark will automatically spill to disk when it fills up memory. If there is more data than will fit on disk in your cluster, the OS on the workers will typically kill the process and you will need to scale up on disk (or memory) to get the job to complete. – Glennie Helles Sindholt Oct 09 '17 at 05:48
  • Thanks for guidance but here when Spark read data/file , definitely it will store that data which it has read. So where it will store this data.. If it won't store so what is happening on reading the file. – Arpit Rai Oct 25 '17 at 08:46

2 Answers2

98

First of all, Spark only starts reading in the data when an action (like count, collect or write) is called. Once an action is called, Spark loads in data in partitions - the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 partition = 1 core = 1 task. Note that all concurrently loaded partitions have to fit into memory, or you will get an OOM.

Assuming that you have several stages, Spark then runs the transformations from the first stage on the loaded partitions only. Once it has applied the transformations on the data in the loaded partitions, it stores the output as shuffle-data and then reads in more partitions. It then applies the transformations on these partitions, stores the output as shuffle-data, reads in more partitions and so forth until all data has been read.

If you apply no transformation but only do for instance a count, Spark will still read in the data in partitions, but it will not store any data in your cluster and if you do the count again it will read in all the data once again. To avoid reading in data several times, you might call cache or persist in which case Spark will try to store the data in you cluster. On cache (which is the same as persist(StorageLevel.MEMORY_ONLY) it will store all partitions in memory - if it doesn't fit in memory you will get an OOM. If you call persist(StorageLevel.MEMORY_AND_DISK) it will store as much as it can in memory and the rest will be put on disk. If data doesn't fit on disk either the OS will usually kill your workers.

Note that Spark has its own little memory management system. Some of the memory that you assign to your Spark job is used to hold the data being worked on and some of the memory is used for storage if you call cache or persist.

I hope this explanation helps :)

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • 1
    For large datasets, setting StorageLevel.MEMORY_AND_DISK is the only option? – vijayraj34 Apr 20 '18 at 07:00
  • 1
    If there is more data in your dataset than will fit in memory alone, then you cannot use neither `cache` nor `persist(StorageLevel.MEMORY_ONLY)`. But you can use all the other StorageLevels, so `StorageLevel.DISK_ONLY` would for instance also be an option :) – Glennie Helles Sindholt Apr 20 '18 at 11:40
  • @GlennieHellesSindholt I know question context but still don't you think that using StorageLevel.DISK_ONLY will affect the performance. or horizontal scaling of the environment is better in case of processing of large file? – devesh Aug 20 '18 at 10:23
  • 3
    @devesh using `StorageLevel.DISK_ONLY` will definitely affect performance! I only mentioned it because the question was if `StorageLevel.MEMORY_AND_DISK` was the _only_ option, which it is not :) – Glennie Helles Sindholt Aug 20 '18 at 11:40
  • @GlennieHellesSindholt How large is default in memory partition size. Can it be configured? – ininprsr Apr 25 '19 at 15:13
  • 1
    Well, you can configure memory per executor via the `--executor-memory` flag (in combination with the spark.executor.memoryOverhead setting). The actual distribution of the assigned memory between the concurrently loaded tasks is managed by Spark internally, but you can probably assume a roughly even distribution. – Glennie Helles Sindholt Apr 27 '19 at 07:43
  • 1
    It doesnt the case, spark will try to recompute if all of the data cant be fit in RDD From documentaion atleast, this is the latest behavior - MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. – Nag May 05 '19 at 10:13
  • what will happen when an executor reads too many partitions and the stored shuffle-data causes OOM? does this mean that spark could only process data that could be all loaded into memory when shuffle is needed? – kuixiong Nov 04 '19 at 04:11
  • @GlennieHellesSindholtas You mentioned ". Note that all concurrently loaded partitions have to fit into memory, or you will get an OOM."Will spark throw OOM in this situiation or ,it can persist the partitions to disk. – anvy elizabeth Feb 14 '20 at 06:28
  • @GlennieHellesSindholt - Greatly explained. Wanted to highlight one thing regarding "StorageLevel.MEMORY_AND_DISK". This setting doesn't spill excess to disk. In case the data doesn't fit the RAM, it spills everything to disk and not partly on RAM and disk. – Dwarrior Jan 25 '22 at 14:57
  • @Dwarrior I do not believe you are right about that. _"StorageLevel.MEMORY_AND_DISK is the default behavior of the DataFrame or Dataset. In this Storage Level, The DataFrame will be stored in JVM memory as deserialized objects. When required storage is greater than available memory, it stores some of the excess partitions into a disk and reads the data from the disk when required"_ – Glennie Helles Sindholt Jan 27 '22 at 09:15
  • @GlennieHellesSindholt, after applying transformation where does the shuffle data gets stored? After every stage where does the spark stores the data? Memory or Disk? – Bharath Ram Jul 13 '22 at 19:28
10

This is quoted directly from Apache Spark FAQ (FAQ | Apache Spark)

Does my data need to fit in memory to use Spark?

No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.

In Apache Spark if the data does not fits into the memory then Spark simply persists that data to disk.

The persist method in Apache Spark provides six persist storage level to persist the data.

MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER 
(Java and Scala), MEMORY_AND_DISK_SER 
(Java and Scala), DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, OFF_HEAP.

The OFF_HEAP storage is under experimentation.

Jescanellas
  • 2,555
  • 2
  • 9
  • 20
Swadeshi
  • 1,596
  • 21
  • 33