18

I'm new to Spark, and I found the Documentation says Spark will will load data into memory to make the iteration algorithms faster.

But what if I have a log file of 10GB and only have 2GB memory ? Will Spark load the log file into memory as always ?

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
WoooHaaaa
  • 19,732
  • 32
  • 90
  • 138

3 Answers3

17

I think this question has been well answered in the FAQ panel of Spark website (https://spark.apache.org/faq.html):

  • What happens if my dataset does not fit in memory? Often each partition of data is small and does fit in memory, and these partitions are processed a few at a time. For very large partitions that do not fit in memory, Spark's built-in operators perform external operations on datasets.
  • What happens when a cached dataset does not fit in memory? Spark can either spill it to disk or recompute the partitions that don't fit in RAM each time they are requested. By default, it uses recomputation, but you can set a dataset's storage level to MEMORY_AND_DISK to avoid this.
Kehe CAI
  • 1,161
  • 12
  • 18
11

The key here is noting that RDDs are split in partitions (see how at the end of this answer), and each partition is a set of elements (can be text lines or integers for instance). Partitions are used to parallelize computations in different computational units.

So the key is not whether a file is too big but whether a partition is. In this case, in the FAQ: "Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data". The issue with large partitions generating OOM is solved here.

Now, even if the partition can fit in memory, such memory can be full. In this case, it evicts another partition from memory to fit the new partition. Evicting can mean either:

  1. Deleting the partition completely: in this case if partition is required again then it is recomputed.
  2. Partition is persisted in storage level specified. Each RDD can be "marked" as to be cached/persisted using this storage levels, see this on how to.

Memory management is well explained here: "Spark stores partitions in LRU cache in memory. When cache hits its limit in size, it evicts the entry (i.e. partition) from it. When the partition has “disk” attribute (i.e. your persistence level allows storing partition on disk), it would be written to HDD and the memory consumed by it would be freed, unless you would request it. When you request it, it would be read into the memory, and if there won’t be enough memory some other, older entries from the cache would be evicted. If your partition does not have “disk” attribute, eviction would simply mean destroying the cache entry without writing it to HDD".

How the initial file/data is partitioned depends on the format and type of data, as well as the function used to create the RDD, see this. For instance:

  • If you have a collection already (a list in java for example), you can use parallelize() and specify the number of partitions. Elements in the collection will be grouped in partitions.
  • If using an external file in HDFS: "Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS)".
  • If reading from a local text file, each line (ended with a new line "\n", end character can be changed, see this) is an element and several lines form a partition.

Finally, I suggest you reading this for more information and also to decide how to choose the number of partitions (too many or too few?).

Marc Cayuela
  • 1,504
  • 13
  • 26
0

It will not load the full 10G, as you don't have enough memory available. From my experience, one of three will happen depending on how you use your data:

If you are trying to cache the 10GBs:

  1. You are going to get an OOME
  2. Data are going to be loaded

If you are just processing the data:

  1. Data are going to be swapped in/out of memory

Of course, this is highly related to your code and the transformations you are applying.

Ioannis Deligiannis
  • 2,679
  • 5
  • 25
  • 48
  • 2
    If your dataset has enough partitions such that individual partitions can fit in memory even though all partitions may not, calling cache() should cause a portion of the partitions to be cached, while the rest will be recomputed from lineage (or read from disk, if you persisted the RDD at the [MEMORY_AND_DISK storage level](https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence)). I think Spark's caching policy is smart enough to never evict blocks of an RDD in order to cache new blocks from that same RDD. – Josh Rosen Dec 06 '13 at 01:41