21
  • Goal: Read from Kinesis and store data in to S3 in Parquet format via spark streaming.
  • Situation: Application runs fine initially, running batches of 1hour and the processing time is less than 30 minutes on average. For some reason lets say the application crashes, and we try to restart from checkpoint. The processing now takes forever and does not move forward. We tried to test out the same thing at batch interval of 1 minute, the processing runs fine and takes 1.2 minutes for batch to finish. When we recover from checkpoint it takes about 15 minutes for each batch.
  • Notes: we are using s3 for checkpoints using 1 executor, with 19g mem & 3 cores per executor

Attaching the screenshots:

First Run - Before checkpoint Recovery Before checkpoint - Streaming Page

Before checkpoint - Jobs Page

Before checkpoint - Jobs Page2

Trying to Recover from checkpoint: After checkpoint - Streaming Page After checkpoint - Jobs Page

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

      if(!jsonRDD.isEmpty()){
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>{
          val str = new String(f)
          if(str.startsWith("{\"message\"")){
            str.substring(11,str.indexOf("@version")-2)
          }
          else{
            str
          }
        })).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      }
    })
  }
}

Kinesis.scala

object Kinesis{


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
    }
    else{
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    }


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole){
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        }else{
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        }
      }

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  }


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
    }

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }
//

    streamingContext.start()
    streamingContext.awaitTermination()


  }




}

DAG DAG

enter image description here

Gaurav Shah
  • 5,223
  • 7
  • 43
  • 71

3 Answers3

4

raised a Jira issue : https://issues.apache.org/jira/browse/SPARK-19304

The issue is because we read more data per iteration than what is required and then discard the data. This can be avoided by adding a limit to getResults aws call.

Fix: https://github.com/apache/spark/pull/16842

Gaurav Shah
  • 5,223
  • 7
  • 43
  • 71
1

When a failed driver is restart, the following occurs:

  1. Recover computation – The checkpointed information is used to restart the driver, reconstruct the contexts and restart all the receivers.
  2. Recover block metadata – The metadata of all the blocks that will be necessary to continue the processing will be recovered.
  3. Re-generate incomplete jobs – For the batches with processing that has not completed due to the failure, the RDDs and corresponding jobs are regenerated using the recovered block metadata.
  4. Read the block saved in the logs – When those jobs are executed, the block data is read directly from the write ahead logs. This recovers all the necessary data that were reliably saved to the logs.
  5. Resend unacknowledged data – The buffered data that was not saved to the log at the time of failure will be sent again by the source. as it had not been acknowledged by the receiver.

enter image description here Since all these steps are performed at driver your batch of 0 events take so much time. This should happen with the first batch only then things will be normal.

Reference here.

Amit Kumar
  • 2,685
  • 2
  • 37
  • 72
  • correct, we know that it happens only on the batch that needs to be recovered via the checkpoints ( not just first one), and we also discovered that it happens on driver, but how to solve this problem is the question – Gaurav Shah Sep 14 '16 at 06:22
  • Checkpointing is really slow, have you tried KryoSerialization. Also consider using Datasets https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html, which has faster ser/desr with Encoders. – Amit Kumar Sep 14 '16 at 07:19
  • slow is relative, checkpointing needs to serialize data to checkpoininting location which is done by receivers(extra work). After recovery all operations on the data are re-computed, this can involve network traffic as now data source is your checkpointing location. Match your numbers with this comments on SPARK-JIRA https://issues.apache.org/jira/browse/SPARK-3129?focusedCommentId=14143775&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14143775 – Amit Kumar Sep 14 '16 at 08:57
  • we don't need to enable WAL on kinesis from 1.5 spark, it can use Kinesis as source and recover from that. The checkpoint contains only Metadata, which is not expensive to write. No ser/de , no data writing. Only metadata https://issues.apache.org/jira/browse/SPARK-9215 – Gaurav Shah Sep 14 '16 at 09:08
  • which spark version you are using? plz check if https://issues.apache.org/jira/browse/SPARK-11316?focusedCommentId=15139658&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15139658 is relevant as your DAG looks similar and in case you are using lower version. – Amit Kumar Sep 14 '16 at 09:30
  • thanks @amit_kumar I did try the sam thing on spark 2 but having similar issue – Gaurav Shah Sep 14 '16 at 10:22
  • I'm running Spark 2.0 and using DataSets - I see the exact same problem. – Glennie Helles Sindholt Sep 15 '16 at 07:11
  • @GlennieHellesSindholt I think we should create a jira issue for spark, they should be able to help – Gaurav Shah Sep 17 '16 at 10:07
  • 1
    @GauravShah Did you create a jira issue? I'm also interesting to follow this ticket. I have streaming tab that appear 45 after the restart of the streaming, and after 1h batch take longer and longer leading to a oom. Because I haven't oom when I'm not restarting from the checkpoint I wonder what append inside the hood. – crak Sep 19 '16 at 14:46
  • I also see `HDFSBackedStateStoreProvider: The state for version 71086 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.` on recovery, which can take a while. It makes sense if it needs to recover it's state from a snapshot and some deltas, but it is pretty slow and can put the query in a bad state given the wrong trigger length. – chadlagore Aug 13 '20 at 21:58
0

I had similar issues before, my application getting slower and slower.

try to release memory after using rdd, call rdd.unpersist() https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)

or spark.streaming.backpressure.enabled to true

http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval

http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements

also, check your locality setting, maybe too much data move around.

keypoint
  • 2,268
  • 4
  • 31
  • 59
  • the application is taking time only in recovery, not in the regular processing. `rdd.unpersist` will help if I am running out of memory, but that is not the case. Backpressure is useful if I am not able to consume data as fast as it is coming in, but I can actually do it. – Gaurav Shah Jan 20 '17 at 09:32