1

I added the checkpoint for SparkContext and write query for kafka data streaming for the long run spark structured streaming job.

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

...

val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .option("checkpointLocation", "s3a://spark-checkpoint/checkpointfiles")
                             .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                }
                             .start()
                             .awaitTermination()

The spark job runs stably. However, I noticed that the checkpoint files were accumulated in HDFS and S3 without automatic cleanup. I saw the storage space were continuously eaten up by these files. Are there some way to configure the retention time for these checkpoint files to get it automatically deleted? Or do I need to run some cron job to manually delete them? If I delete them manually, will it impact the ongoing spark jobs? Thanks!

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
yyuankm
  • 295
  • 4
  • 22
  • spark.cleaner.referenceTracking.cleanCheckpoints needs to be set to true, default is false. What do you have? – thebluephantom Sep 27 '20 at 11:55
  • Thanks thebluephantom, this option is helpful to clean up checkpoint files in the location defined in sparkcontext. I also checked that the checkpoint files generated by Kafka write stream in s3 will be automatically deleted on the daily basis no matter if I set this option or not. – yyuankm Sep 28 '20 at 08:33

1 Answers1

2

spark.cleaner.referenceTracking.cleanCheckpoints needs to be set to true, default is false.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83