8

I'm currently using

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

In order to save time when calculating plans, however docs say that checkpointing is the suggested way to "cut" lineage. BUT I don't want to pay the price of saving the RDD to disk.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

Only think I can guess is that if some node fails after my "lineage breaking" maybe my process will fail while the checkpointed one would have worked correctly? (what If the DF is cached instead of checkpointed?)

Thanks!

EDIT:

From SMaZ answer, my own knowledge and the article which he provided. Using createDataframe (which is a Dev-API, so use at "my"/your own risk) will keep the lineage in memory (not a problem for me since I don't have memory problems and the lineage is not big).

With this, it looks (not tested 100%) that Spark should be able to rebuild whatever is needed if it fails.

As I'm not using the data in the following executions, I'll go with cache+createDataframe versus checkpointing (which If i'm not wrong, is actually cache+saveToHDFS+"createDataFrame").

My process is not that critical (if it crashes) since an user will be always expecting the result and they launch it manually, so if it gives problems, they can relaunch (+Spark will relaunch it) or call me, so I can take some risk anyways, but I'm 99% sure there's no risk :)

Community
  • 1
  • 1
BiS
  • 501
  • 4
  • 17
  • Have you looked at this [question](https://stackoverflow.com/questions/52556798/spark-iterative-recursive-algorithms-breaking-spark-lineage)? – LizardKing Sep 02 '19 at 10:12
  • Yeah I did, but the answer is not there, I want to know the drawbacks of this method since official docs suggest the slower method. – BiS Sep 02 '19 at 10:19
  • Although re-reading it, it looks like checkpointing is needed for iterative reads, this method should be fine for independent reads (from Garren's comment). However I'm still curious about what happens if something crashes after the "InMemoryTable" provided by the createDataFrame was already calculated (is it cached or maybe Spark is just capable of rebuilding it anyways?). – BiS Sep 02 '19 at 19:55
  • @BiS: What do you mean by `my method`? You mean creating with `sparkSession.createDataFrame`? – SMaZ Sep 06 '19 at 02:26
  • @SMaZ yeah, creating it with dataframe with no checkpoint – BiS Sep 06 '19 at 07:26
  • @BiS : Ok, I will give detailed answer over the weekend. – SMaZ Sep 06 '19 at 14:31

2 Answers2

7

Let me start with creating dataframe with below line :

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

If we take close look into SparkSession class then this method is annotated with @DeveloperApi. To understand what this annotation means please take a look into below lines from DeveloperApi class

A lower-level, unstable API intended for developers.

Developer API's might change or be removed in minor versions of Spark.

So it is not advised to use this method for production solutions, called as Use at your own risk implementation in open source world.

However, Let's dig deeper what happens when we call createDataframe from RDD. It is calling the internalCreateDataFrame private method and creating LogicalRDD.

LogicalRDD is created when:

  • Dataset is requested to checkpoint
  • SparkSession is requested to create a DataFrame from an RDD of internal binary rows

So it is nothing but the same as checkpoint operation without saving the dataset physically. It is just creating DataFrame From RDD Of Internal Binary Rows and Schema. This might truncate the lineage in memory but not at the Physical level.

So I believe it's just the overhead of creating another RDDs and can not be used as a replacement of checkpoint.

Now, Checkpoint is the process of truncating lineage graph and saving it to a reliable distributed/local file system.

Why checkpoint?

  • If computation takes a long time or lineage is too long or Depends too many RDDs

  • Keeping heavy lineage information comes with the cost of memory.

  • The checkpoint file will not be deleted automatically even after the Spark application terminated so we can use it for some other process

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

This article will give detail information on cache and checkpoint. IIUC, your question is more on where we should use the checkpoint. let's discuss some practical scenarios where checkpointing is helpful

  1. Let's take a scenario where we have one dataset on which we want to perform 100 iterative operations and each iteration takes the last iteration result as input(Spark MLlib use cases). Now during this iterative process lineage is going to grow over the period. Here checkpointing dataset at a regular interval(let say every 10 iterations) will assure that in case of any failure we can start the process from last failure point.
  2. Let's take some batch example. Imagine we have a batch which is creating one master dataset with heavy lineage or complex computations. Now after some regular intervals, we are getting some data which should use earlier calculated master dataset. Here if we checkpoint our master dataset then it can be reused for all subsequent processes from different sparkSession.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

That's correct, If your process is not heavy-computation/Big-lineage then there is no point of checkpointing. Thumb rule is if your dataset is not used multiple time and can be re-build faster than the time is taken and resources used for checkpoint/cache then we should avoid it. It will give more resources to your process.

pulsar
  • 141
  • 2
  • 13
SMaZ
  • 2,515
  • 1
  • 12
  • 26
  • CreateDataframe is enough for me (as I only need to truncate lineage in memory, no need to save memory or reuse data in future computation. I need to save time in the next actions' plan computation). From the article and your answer I'm almost sure it's not risky (ignoring the fact it's a dev API) since Spark will know the lineage anyways because it's still "in memory" (so if it crashes, it will recompute it or take it from the cache if it's cached), but it just doesn't mix it with the rest of the "action" so the plan computation is shorter. – BiS Sep 08 '19 at 19:50
  • I mean, the question was not oriented towards execution time savings or whether to use checkpoint or not, but more about catalyst processing time savings, with those solved (either by checkpoint or createDataframe). I just wanted to know if the createDataframe method introduced any RISK for the current execution (like not being able to re-process the RDD/re-read it from cache in case of executor failure), guessing its not a problem. – BiS Sep 08 '19 at 19:52
  • @BiS : Got it on what you are trying to do. Just one thing, If your enough resources and you are anyway going to cache then you don't need to do `createDataFrame` as well. Lineage will not be a concern if the dataset is cached as it will use `InMemoryTableScan` `InMemoryRelation` Plan, – SMaZ Sep 08 '19 at 19:58
  • 1
    Hmm, from my experience it does make a difference, caching somehow keeps the full plan, even though it resolves at some point to the InMemoryTableScan (I see that in Spark UI), I save like 2-3 minutes of the execution time (out of 10-11) by just forcibly truncating the plan (versus only cached). AFAIK Spark cache "resolving" is performed if the plan matches after calculating it (I guess maybe it's resolved in the physical planning part), I've seen cases of plans not using cached DFs because Catalyst optimizing changed them (not sure if that was a bug, but it forced me to do truncate as well). – BiS Sep 08 '19 at 20:03
  • @BiS: Interesting. Will debug something more in this. – SMaZ Sep 08 '19 at 20:14
  • If it helps, my use case builds a list of "clients" (example, not actually clients :) ) using a big-plan, then I cache/break lineage. In the following phase, I make many independent calculations (aka add new info about the client, combining many tables) and then union them, which accounts like for 20 unions. If I don't hide the lineage used to build the clients from Spark, Spark will spend quite a lot of time trying to optimize the plan of those 20 unions (each union is HUGE since it will keep the previous phase's lineage) for nothing, since they are cached and cant be optimized anymore. – BiS Sep 08 '19 at 20:23
  • Aka each union is just "take the base DF and add info from this 2-3 tables with some logic", so a quite small lineage if the base DF doesn't come have a big lineage/is truncated (I have to do unions+groupBy afterwards because the amount of enrichments depends on user inputs, and it also helps Spark to perform things in parallel) – BiS Sep 08 '19 at 20:25
0

I think the sparkSession.createDataFrame(df.rdd, df.schema) will impact the fault tolerance property of spark.

But the checkpoint() will save the RDD in hdfs or s3 and hence if failure occurs, it will recover from the last checkpoint data.

And in case of createDataFrame(), it just breaks the lineage graph.

סטנלי גרונן
  • 2,917
  • 23
  • 46
  • 68
  • 1
    Thanks! From my experience, spark seems to be able to rebuild those on crashes. I think the driver somehow keeps the lineage of those InMemoryTables in case they have to be rebuit. Not 100% sure tho, but never had any problem with it (most of my programs are fully-repetable tho). – BiS Jun 20 '20 at 12:30