What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?
-
It is a very generic question. Better would be to add some context around it. To answer your question it can be stored to any persistent Storage Area - Local DIsk or HDFS or NFS Mounted space etc. – Sumit Feb 01 '16 at 11:15
-
13@Sumit - This is a very specific question about the differences between two Spark RDD methods. The answer can be objective and focused, as zero323's answer below demonstrates. – Nick Chammas Mar 24 '16 at 18:33
4 Answers
There are few important differences but the fundamental one is what happens with lineage. Persist
/ cache
keeps lineage intact while checkpoint
breaks lineage. Lets consider following examples:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
cache
/persist
:val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] indCache.count // 3 indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
checkpoint
:val indChk = rdd.mapValues(_ > 4) indChk.checkpoint indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ShuffledRDD[3] at reduceByKey at <console>:21 [] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [] indChk.count // 3 indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ReliableCheckpointRDD[12] at count at <console>:27 []
As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache
are lost. In the second case lineage is completely lost after the checkpoint and indChk
doesn't carry an information required to rebuild it anymore.
checkpoint
, unlike cache
/ persist
is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:
It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
Finally checkpointed
data is persistent and not removed after SparkContext
is destroyed.
Regarding data storage SparkContext.setCheckpointDir
used by RDD.checkpoint
requires DFS
path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint
and persist
without replication should use local file system.
Important Note:
RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.

- 1,165
- 9
- 19

- 322,348
- 103
- 959
- 935
-
5Little bit confused here. If I have a long computation chain and I decide to break it, assign it to a variable, and chache it. Then I will read that variable and go forward. Now my question is, how is it different from checkpointing? (except from recomputation perspective). My real question is, in what sitauation we should go for checkpointing instead of the above mentioned process? Given that the checkpointed RDD will not be used in future. Clarification will be helpful :) – Mrinal Apr 03 '18 at 06:07
-
3Another important difference is that if you `persist` / `cache` an RDD, and later dependent RDD-s need to be calculated, then the persisted/cached RDD content is used automatically by Spark to speed up things. But if you just `checkpoint` the same RDD, it won't be utilized when calculating dependent RDD-s. I wonder when a checkpointed RDD is used by Spark. If there's a failure, will Spark use it *automatically*? Or am I supposed to `spark.read` it manually to get the RDD to continue from? That would explain why it's never deleted by Spark. So, how are checkpoints supposed to be used? – ddekany Aug 07 '18 at 14:37
-
What is the recommendation for DStream with CustomReceiver implementations? Should they be checkpointed as is the norm for Kafka receivers or should they be persisted? – user1384205 Feb 22 '19 at 14:49
-
2@Mrinal - re: in what situation should checkpointing be used? Checkpoints hang around even after the job is finished (while persisted-to-disk blocks might be cleaned up). So one thing (which I have read) that checkpoints are useful for is if you have a flaky long running job that sometimes fails in a busy cluster, checkpointing can help you recover faster because on restart you don't have to go all the way back to the beginning of the job. – Chris Bedford Jul 15 '19 at 20:07
-
@ddekany - response above might be interesting to you to ! hope it is useful ;^) – Chris Bedford Jul 15 '19 at 20:08
-
Thanks @Chris. Just for the sake of discussion, I have noticed situations where the dataframe was being used only once, but after using checkpoint, the job finished relatively faster (like around 40% faster in a recent encounter). – Mrinal Jul 16 '19 at 09:32
-
regarding your "Important point", the spark documentation does not tell that. https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing In fact, in one of the spark-submit conferences, the presenter said that the checkpoint helps to break the lineage else it will grow in a span of time. Could you please give me some reference to support your point? – Monu Oct 25 '20 at 20:15
-
So, in which case should we use checkpoint, it seems it is just like saving an RDD and load and RDD. is it the same? – DennisLi Mar 21 '21 at 09:09
-
1@ddekany " If there's a failure, will Spark use it automatically? Or am I supposed to spark.read it manually to get the RDD to continue from?" Yes, spark will automatic read data from checkpoint. In my prespective, the broke lineage with father rdd,will let you no need to recompute from source, which will save lots of time. – Felix Feng Jun 01 '22 at 03:48
I think you can find a very detailed answer here
While it is very hard to summarize all in that page, I will say
Persist
- Persisting or caching with StorageLevel.DISK_ONLY cause the generation of RDD to be computed and stored in a location such that subsequent use of that RDD will not go beyond that points in recomputing the linage.
- After persist is called, Spark still remembers the lineage of the RDD even though it doesn't call it.
- Secondly, after the application terminates, the cache is cleared or file destroyed
Checkpointing
- Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it.
- The checkpoint file won't be deleted even after the Spark application terminated.
- Checkpoint files can be used in subsequent job run or driver program
- Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.
You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.

- 2,788
- 5
- 26
- 32

- 700
- 5
- 11
Persist(MEMORY_AND_DISK) will store the data frame to disk and memory temporary without breaking the lineage of the program i.e. df.rdd.toDebugString() would return the same output. It is recommended to use persist(*) on a calculation, that is going to be reused to avoid recalculation of intermediate results:
df = df.persist(StorageLevel.MEMORY_AND_DISK) calculation1(df) calculation2(df)
Note, that caching the data frame does not guarantee, that it will remain in memory until you call it next time. Depending on the memory usage the cache can be discarded.
checkpoint(), on the other hand, breaks lineage and forces data frame to be stored on disk. Unlike usage of cache()/persist(), frequent check-pointing can slow down your program. Checkpoints are recommended to use when a) working in an unstable environment to allow fast recovery from failures b) storing intermediate states of calculation when new entries of the RDD are dependent on the previous entries i.e. to avoid recalculating a long dependency chain in case of failure

- 8,987
- 8
- 54
- 77
If you check the relevant part of the documentation, it talks about writing data to a reliable system, e.g. HDFS. But it is up to you to tell Apache Spark where to write its checkpoint information.
On the other hand, persisting is about caching data mostly in memory, as this part of the documentation clearly indicates.
So, it depends on what directory you gave to Apache Spark.

- 8,211
- 14
- 64
- 105
-
2Persistence in streaming is quite different issue and not really related to caching. – zero323 Jun 30 '16 at 21:04