9

I am working on a Spark ML pipeline where we get OOM Errors on larger data sets. Before training we were using cache(); I swapped this out for checkpoint() and our memory requirements went down significantly. However, in the docs for RDD's checkpoint() it says:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

The same guidance is not given for DataSet's checkpoint, which is what I am using. Following the above advice anyways, I found that the memory requirements actually increased slightly from using cache() alone.

My expectation was that when we do

...
ds.cache()
ds.checkpoint()
...

the call to checkpoint forces evaluation of the DataSet, which is cached at the same time before being checkpointed. Afterwards, any reference to ds would reference the cached partitions, and if more memory is required and the partitions are evacuated that the checkpointed partitions will be used rather than re-evaluating them. Is this true, or does something different happen under the hood? Ideally I'd like to keep the DataSet in memory if possible, but it seems there is no benefit whatsoever from a memory standpoint to using the cache and checkpoint approach.

user10938362
  • 3,991
  • 2
  • 12
  • 29
oirectine
  • 139
  • 8

1 Answers1

7

TL;DR You won't benefit from in-memory cache (default storage level for Dataset is MEMORY_AND_DISK anyway) in subsequent actions, but you should still consider caching, if computing ds is expensive.

Explanation

Your expectation that

ds.cache()
ds.checkpoint()
...

the call to checkpoint forces evaluation of the DataSet

is correct. Dataset.checkpoint comes in different flavors, which allow for both eager and lazy checkpointing, and the default variant is eager

def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true)

Therefore subsequent actions should reuse checkpoint files.

However, under the covers Spark simply applies checkpoint on the internal RDD, so the rules of evaluation didn't change. Spark evaluates action first, and then creates checkpoint (that's why caching was recommended in the first place).

So if you omit ds.cache() ds will be evaluated twice in ds.checkpoint():

Therefore nothing changed and cache is still recommended, although recommendation might might slightly weaker, compared to plain RDD, as Dataset cache is considered computationally expensive, and depending on the context, it might be cheaper to simply reload the data (note that Dataset.count without cache is normally optimized, while Dataset.count with cache is not - Any performance issues forcing eager evaluation using count in spark?).

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • In "Dataset.count without cache is normally optimized, while Dataset.count without it, is not" -> the last should be **"with it"** and not **"without it"** right ? – bonnal-enzo Mar 28 '20 at 18:22