6

I am trying to test below program to take the checkpoint and read if from checkpoint location if in case application fails due to any reason like resource unavailability. When I kill the job and retrigger it again, execution restarts from beginning. Not sure what else is required to achieve this. Thanks !!

Below is the code:

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object withCheckpoint {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    //val conf = new SparkConf().setAppName("Without Checkpoint")
    val conf = new SparkConf().setAppName("With Checkpoint")
    val sc = new SparkContext(conf)


    val checkpointDirectory = "/tmp"

    sc.setCheckpointDir(checkpointDirectory)   // set checkpoint directory

    val spark = SparkSession.builder.appName("Without Checkpoint").getOrCreate()



    /************************************************************************************************************************************************/
    /*                                                Reading source data begins here                                                               */
    /************************************************************************************************************************************************/


    val readCtryDemoFile = spark.read.option("header", "true").csv("/tmp/Ctry_Demo.csv")



    val readCtryRefFile = spark.read.option("header","true").csv("/tmp/ref_ctry.csv")



    val readCtryCntntFile = spark.read.option("header","true").csv("/tmp/ctry_to_continent.csv")


    /************************************************************************************************************************************************/
    /*                                                Reading source data Completes                                                                 */
    /************************************************************************************************************************************************/


    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/


    /*********************************************************************************/
    /* Join above created dataframes to pull respective columns                      */
    /*********************************************************************************/


    val jnCtryDemoCtryref = readCtryDemoFile.join(readCtryRefFile,Seq("NUM_CTRY_CD"))


    val jnCtryCntnt = jnCtryDemoCtryref.join(readCtryCntntFile,Seq("Alpha_2_CTRY_CD"))





    /*********************************************************************************/
    /* Checkpointing the above created Dataframe to the checkpoint Directory         */
    /*********************************************************************************/

    val jnCtryCntntchkpt = jnCtryCntnt.checkpoint()
    jnCtryCntntchkpt.collect()

    /*********************************************************************************/
    /* Creating multiple outputs based on different aggregation keys                 */
    /*********************************************************************************/

    val aggCntnNm = jnCtryCntntchkpt.groupBy("CONTINENT_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("CONTINENT_NM")
    aggCntnNm.show()


    val aggCtryNm = jnCtryCntntchkpt.groupBy("Ctry_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("Ctry_NM")
    aggCtryNm.show()


    val aggCtryCd = jnCtryCntntchkpt.groupBy("NUM_CTRY_CD").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("NUM_CTRY_CD")
    aggCtryCd.show()

    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/

  }
}
NRC
  • 83
  • 1
  • 5
  • I would follow this links to understand checkpointing and persisting and when to use one or another: https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk and this one may be useful too : https://stackoverflow.com/questions/36632356/what-does-checkpointing-do-on-apache-spark – Chema Jun 05 '20 at 11:06
  • What I am unable to understand based on my limited knowledge in this area is, what do I need to additionally add to my spark application so that it understands to read the Checkpoint directory in case of failure. Also am I suppose to clean up the directory by end of the application since I can see it keeps writting data at the directory location with different names. Thanks !! – NRC Jun 08 '20 at 01:13
  • In the case of spark streaming it is mandatory to create a checkpointdir, for both, in case of failure or in case to calculate some intermediate results and it reads automatically, you don't have to do anything more. In case of rdd or Dataframe could be better to persists because it mainteins lineage to recover in case of failure and avoiding recomputation. – Chema Jun 08 '20 at 07:52
  • Below is the program which I am trying to run top test if checkpoint is working as expected. After checkpoint was taken, I killed the job and retriggered it again. But the execution didn't start from the Checkpoint location rather restarted from scratch. Not sure what I am exactly missing. Below is the program for your reference just in case this helps. Any help is muct appreciated. Thanks !! – NRC Jun 15 '20 at 02:24

0 Answers0