8

I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

After this i have this file on HDFS:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.

aweis
  • 5,350
  • 4
  • 30
  • 46

1 Answers1

13

Spark has implicit mechanism for checkpoint files cleaning.

Add this property in spark-defaults.conf.

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

You can find more about Spark configuration in Spark official configuration page

If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.

This property spark.cleaner.referenceTracking.cleanCheckpoints as true, allows to cleaner to remove old checkpoint files inside the checkpoint directory.

ggeop
  • 1,230
  • 12
  • 24
  • Hi, I tried to add it to my spark2-shell command and i could see it set in the spark history server, but when i run my code and close the shell with "sys.exit" the checkpoint folder is still on HDFS. Can this not be specified for the individual spark jobs that i submit? – aweis Feb 01 '20 at 05:35
  • @aweis the checkpoint will never be deleted from the HDFS. If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command [rmtree](https://docs.python.org/3/library/shutil.html#shutil.rmtree). With `spark.cleaner.referenceTracking.cleanCheckpoints` as true, Spark will delete the checkpoint files that are old inside the checkpoint directory. – ggeop Feb 01 '20 at 17:58
  • thanks for the suggestions, however doing manual delete of the checkpoint location is kinda defeating the purpose as i then need to create unque folders for each checkpointed DF to be sure there are no raceconditions (like spark does under the checkpoint location). There must be a way of configuring spark to clean up any data spills when it is done - This is potentially many GB left floating on HDFS – aweis Feb 02 '20 at 17:22
  • @aweis you don't have other choice for now, either you use `spark.cleaner.referenceTracking.cleanCheckpoints`, which allows Spark to apply a retention policy in old data, but you will not have control on it or manually clean the directory at the end of the process, so you will be sure that you will not left any files in the HDFS. I your case, because as I have understand it's not a streaming application, I suggest to manually clean the files, it's simple and you will have control. On the other hand, if you had as streaming application, it will be better to let Spark handle the checkpoint files – ggeop Feb 02 '20 at 17:30