3

This question is not something new but after lot of googling and having no luck, i'm posting the question here.

sc.setCheckpointDir("C:\\mydrive\\Checkpoint")
val data = Seq(1,2,3,4,5,6,7,8,9)
val base = sc.parallelize(data)
base.checkpoint()
base.collect().foreach { println(_) }

My above code does check-pointing but I'm not sure the code reads data from subsequent run.please find log details

17/05/17 15:37:48 DEBUG ReliableCheckpointRDD: No partitioner file

17/05/17 15:37:48 INFO ReliableRDDCheckpointData: Done checkpointing RDD 0 to file:/C:/mydrive/Checkpoint/d10861cd-70c3-4e60-bdd3-a4753dfee1b2/rdd-0, new parent is RDD 1

The log gives me an impression that it is not reading checkpointed data on subsequent runs but writing.If this is the case, how to read check-pointed data in subsequent runs? what i'm missing here ?

Community
  • 1
  • 1
Balaji Reddy
  • 5,576
  • 3
  • 36
  • 47

2 Answers2

2

checkpoints are used automatically, when checkpointed RDD is reused but it is lazy, and if RDD is not persisted, will require recomputation.

Since your pipeline is completely linear and you don't reuse the RDD it is pretty much useless.

  • thank you for your response.I will go with cache in case if I need to reuse the RDD within app life cycle. What i'm trying to experiment is, when i run the code for first time, it should checkpoint the data. From second time onward, it should read check-pointed data and prepare the RDD. – Balaji Reddy May 17 '17 at 10:47
2

Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system. There are two types of checkpointing:

  • reliable - in Spark (core), RDD checkpointing that saves the actual intermediate RDD data to a reliable distributed file system, e.g. HDFS.

  • local - in Spark Streaming or GraphX - RDD checkpointing that truncates RDD lineage graph.

It’s up to a Spark application developer to decide when and how to checkpoint using RDD.checkpoint() method. Before checkpointing is used, a Spark developer has to set the checkpoint directory using SparkContext.setCheckpointDir(directory: String) method.

Reliable Checkpointing

You call SparkContext.setCheckpointDir(directory: String) to set the checkpoint directory - the directory where RDDs are checkpointed. The directory must be a HDFS path if running on a cluster. The reason is that the driver may attempt to reconstruct the checkpointed RDD from its own local file system, which is incorrect because the checkpoint files are actually on the executor machines.

You mark an RDD for checkpointing by calling RDD.checkpoint(). The RDD will be saved to a file inside the checkpoint directory and all references to its parent RDDs will be removed. This function has to be called before any job has been executed on this RDD.

It is strongly recommended that a checkpointed RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Let's assume you persisted RDD named "users" and to use preserved RDD calling by name please refer to code snippet below:

import com.typesafe.config.Config
import org.apache.spark.SparkContext, SparkContext._
import org.apache.spark.rdd.RDD

trait UsersSparkJob extends spark.jobserver.SparkJob with spark.jobserver.NamedRddSupport with UsersRDDBuilder {
  val rddName = "users"


  def validate(sc: SparkContext, config: Config): spark.jobserver.SparkJobValidation = spark.jobserver.SparkJobValid
}

object GetOrCreateUsers extends UsersSparkJob {

  override def runJob(sc: SparkContext, config: Config) = {
    val users: RDD[(Reputation, User)] = namedRdds.getOrElseCreate(
      rddName,
      build(sc))

    users.take(5)
  }
}
FaigB
  • 2,271
  • 1
  • 13
  • 22
  • thank you for your response.I will go with cache in case if I need to reuse the RDD within app life cycle. What i'm trying to experiment is, when i run the code for first time, it should checkpoint the data. From second time onward, it should read check-pointed data and prepare the RDD. is this possible ? – Balaji Reddy May 17 '17 at 10:53
  • 1
    Yes it is possible but requires object sharing and one of solution for that can be https://github.com/spark-jobserver/spark-jobserver – FaigB May 17 '17 at 11:00
  • 1
    Thanks for the info.it would be better if you suggest me a way to read check pointed files. – Balaji Reddy May 17 '17 at 11:02
  • Added an approach which you can investigate and proceed for your own case. – FaigB May 17 '17 at 11:14
  • Sure. tats helpful – Balaji Reddy May 17 '17 at 11:17