5

Spark 2.4.0


rdd = rdd.cache()
print(rdd.getStorageLevel())

Memory Serialized 1x Replicated

sc.setCheckpointDir("/tmp/checkpoints")
rdd.checkpoint()

Action on rdd

rdd.count()

25066

Check if checkpointed:

rdd.isCheckpointed()

False

print(rdd.getCheckpointFile())

False

Dev
  • 13,492
  • 19
  • 81
  • 174
  • What is returned when you do `isLocallyCheckpointed()`? – Bala Nov 14 '19 at 12:05
  • @Bala its returning False – Dev Nov 14 '19 at 12:07
  • Will scala code work for you? If so please add that tag. I will write an example – Salim Jan 17 '20 at 02:25
  • Does [this](https://stackoverflow.com/questions/54005223/spark-scala-checkpointing-data-set-showing-ischeckpointed-false-after-action) answer your doubt? – samkart Jan 17 '20 at 02:34
  • Essentially, you just need `checkpointed_rdd = rdd.checkpoint()` Your `checkpointed_rdd.isCheckpointed()` should return true – samkart Jan 17 '20 at 02:37
  • Did you try another folder or changing permissions? Maybe cause is just permissions. – lvnt Jan 17 '20 at 08:10
  • @samkart [rdd.checkpoint()](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.checkpoint) does not return rdd. – Dev Jan 17 '20 at 08:50
  • @Salim I am working with pyspark so cant use scala only for this. – Dev Jan 17 '20 at 08:52
  • @slmn looks like its a permission issue or some config on EMR. – Dev Jan 17 '20 at 09:11
  • @devツ did i deserve the bounty? :) If it works i will write an answer? – lvnt Jan 17 '20 at 09:54
  • @slmn i'll verify on an older version of EMR cluster if its HDFS permission issue or older spark version issue. If its permission issue, you definitely deserve a bounty :) – Dev Jan 17 '20 at 10:17

2 Answers2

3

I tested in standalone cluster using Spark 2.4.2. Checkpoint is working there too.

    spark.sparkContext.setCheckpointDir("temp/")
    val textFile=spark.sparkContext.textFile("test1.txt")
    println("textFile.isCheckpointed = " + textFile.isCheckpointed)
    textFile.checkpoint()
    println("textFile.count() = " + textFile.count())
    println("textFile.isCheckpointed = " + textFile.isCheckpointed)

Result

textFile.isCheckpointed = false
textFile.count() = 8
textFile.isCheckpointed = true
Salim
  • 2,046
  • 12
  • 13
2

I tested on spark 2.4.4 (EMR 5.28) and its working.

I might be a permission or config issue via EMR as previously I tried on spark 2.4.3 and I don't see any issue on checkpointing in the 2.4.4 release notes.

df = spark.range(1, 7, 2)
df.show()

rdd = df.rdd
rdd = rdd.cache()
print("Storage Level - {}".format(rdd.getStorageLevel()))

print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))


# Setting HDFS directory
sc.setCheckpointDir("/tmp/checkpoint_dir/")
rdd.checkpoint()

print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))

# Calling an action
print("count - {}".format(rdd.count()))

print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))

Output:

+---+
| id|
+---+
|  1|
|  3|
|  5|
+---+

Storage Level - Memory Serialized 1x Replicated
Is Checkpointed - False
Checkpoint File - None
Is Checkpointed - False
Checkpoint File - None
count - 3
Is Checkpointed - True
Checkpoint File - hdfs://ip-xx-xx-xx-xx.ec2.internal:8020/tmp/checkpoint_dir/5d3bf642-cc17-4ffa-be10-51c58b8f5fcf/rdd-9
Dev
  • 13,492
  • 19
  • 81
  • 174