4

This text from an interesting article: http://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/

" ... Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it. The checkpoint file won’t be deleted even after the Spark application terminated. Checkpoint files can be used in subsequent job run or driver program. Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory. ..."

I seem to remember reading elsewhere that checkpointed files were only for the a Job or shared Jobs in a given Spark App.

Looking for clarification and how a new App could use the checkpoint directory, as I did not think that was possible.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83

1 Answers1

2

I seem to remember reading elsewhere that checkpointed files were only for the Job or shared Jobs in a given Spark App.

Spark will not purge the checkpoint directory even after stopping the SparkContext. We can turn on auto cleanup by setting below propert:

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

Looking for clarification and how a new App could use the checkpoint directory, as I did not think that was possible.

To reuse the checkpointed dataset again we can follow below steps:

  1. Start context 1 and checkpoint dataset:
// Setting logger on for ReliableRDDCheckpointData
scala> import org.apache.log4j.{Level, Logger}
scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)

// Note application ID
scala> spark.sparkContext.applicationId
res1: String = local-1567969150914

// Set checkpoint Dir
scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")

// File system localtion
Users-Air:checkpoint User$ pwd
/tmp/spark/checkpoint
Users-Air:checkpoint User$ ls -lrth
total 0
drwxr-xr-x  2 User  wheel    64B Sep  8 15:00 7aabcb46-e707-49dd-8893-148a162368d5

// Create Dataframe
scala> val df = spark.range(3).withColumn("random", rand())
scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+

scala> df.schema
res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))

//Check point 
scala> df.checkpoint
19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]

// New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
Users-Air:rdd-7 User$ ls -lrth
total 32
-rw-r--r--  1 User  wheel     4B Sep  8 15:02 part-00000
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00002
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00001
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00003

// Stop context 
scala> spark.stop
scala> :quit

  1. Start new Context 2 and read the checkpointed dataset
// Initilaized New Context 
scala> spark.sparkContext.applicationId
res0: String = local-1567969525656

SparkContext.checkpointFile is a protected[spark] method so we need to create class under org.apache.spark package

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

package org.apache.spark
object RecoverCheckpoint {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
    sc.checkpointFile[T](path)
  }
}

Now recover the Checkpointed RDD as RDD[InternalRow] using above RecoverCheckpoint class

// Path from first context
scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
scala> import org.apache.spark.RecoverCheckpoint
scala> import org.apache.spark.sql.catalyst.InternalRow
scala> import org.apache.spark.sql.types._
scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)

// RDD is recovered as RDD[InternalRow]
scala> RecoveredRDD
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34

// Count matches with original
RecoveredRDD.count
res3: Long = 3

To convert recovered RDD to Dataset creating RecoverCheckpointRDDToDF class


// Need to convert RDD[InternalRow] to DataFrame
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

// Creating Dataframe from RDD[InternalRow]
package org.apache.spark.sql
object RecoverCheckpointRDDToDF {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)
  }
}

Finally, use the RecoverCheckpointRDDToDF and get the dataset back

// Schema should be know
val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))

scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)

scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+

// Same as first context

// Stop context
scala> spark.stop
scala> :quit

SMaZ
  • 2,515
  • 1
  • 12
  • 26
  • ok, i am going to try that. gosh, i also read i am pretty sure only available in same context. ok that that is not so. awfully convoluted. when would we do this given we can use spark bucketBy between Apps? – thebluephantom Sep 08 '19 at 20:56
  • Good answer, buf I will not be implementing it unless you can convince me. – thebluephantom Sep 08 '19 at 20:57
  • @thebluephantom: :) tbh, I haven't put any use case in production with this technique but this seems to be a legit approach. What are your doubts? – SMaZ Sep 08 '19 at 21:11
  • I explored this for one of the use case while back but was not comfortable. – SMaZ Sep 08 '19 at 21:12
  • hard yakka. bucketBy seens easier to me by far, but it is a tricky topic so I am impressed. – thebluephantom Sep 08 '19 at 21:14
  • @ SMaZ..I had some problem way back related to checkpointing. I will try above options. It's informative. Could you please take a look at below question also. https://stackoverflow.com/questions/53647124/reading-from-hive-table-and-updating-same-table-in-pyspark-using-checkpoint – vikrant rana Sep 09 '19 at 17:40
  • 1
    I am an architect as well, and the way I see it, we would not use this approach. Too hard. This aspect spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true") is indeed a clue, I think we missed this and assumed the various blurbs. Once again, great stuff. – thebluephantom Sep 09 '19 at 17:46
  • @SMaZ.. I was trying to code above in pyspark but was not succeeded. I score null in Scala. Do you have Pyspark version as well for above code or Do let me know in case I have to raise a question. Thanks – vikrant rana Sep 14 '19 at 08:01
  • 2
    @vikrantrana Sorry , I don’t have python version of it. In Pyspark `checkpointFile` method is exposed as `sc._checkpointFile(path, deserializer) that` will give you `ReliableChckpointRDD`. That might be way to go. However, not sure on how to do next step. – SMaZ Sep 18 '19 at 13:18