0

I'm having some "lineage problems" when running a Spark Streaming application, which loads historical data from Elasticsearch on startup, and updates this data with data coming from Apache Kafka messages.

I posted a question some time ago because after a while, my application runs in a kind of deadlock, meaning that the amount of time needed to calculate the results is longer than the streaming window, thus stalling it.

See

The recommendation was to use checkpointing, which I tried. Still, the problem remains the same.

Here's my current sample code:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object ReadFromKafkaAndES {
  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("kafka").setLevel(Level.WARN)

    System.setProperty("hadoop.home.dir", "D:\\Development\\fake-hadoop-for-spark")

    val checkpointDirectory = "D:/tmp/Spark"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")

    val topicsSet = Array("sales").toSet
    var offsetRanges = Array[OffsetRange]()

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(20))
    ssc.checkpoint(checkpointDirectory)

    //Create SQLContect
    val sqlContext = new SQLContext(sc)

    //Get history data from ES
    var history : DataFrame = sqlContext.esDF("data/salesaggregation")

    //Kafka settings
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Register temporary table for the aggregated history
        history.registerTempTable("history")

        println("--- History -------------------------------")
        history.show()

        //Parse JSON as DataFrame
        val saleEvents = sqlContext.read.json(rdd.values)

        //Register temporary table for sales events
        saleEvents.registerTempTable("sales")

        val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")

        println("--- Sales ---------------------------------")
        sales.show()

        val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")

        println("--- Aggregation ---------------------------")
        agg.show()

        //This is our new "history"
        history = agg.toDF()

        //As recommended, call checkpoint()
        history.rdd.checkpoint()

        //Save to Elasticsearch
        history.saveToEs("data/salesaggregation", Map("es.mapping.id" -> "userId"))

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Is there somewhere a problem in my thinking? The code itself runs smoothly, but yet the problems remain.

I was also trying to use updatestatebykey, but it's giving me a hard time as a Spark and Scala beginner, because from what I understand it only works on DStream pairs, and I will have a number of fields to update for each userId.

EDIT:

I added history.explain(true) according to Holden's answer after the aggregation statement. This shows that the lineage increases with every iteration:

First iteration:

== Parsed Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
     Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
      Subquery history
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
    Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
     Subquery sales
      Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Analyzed Logical Plan ==
userId: bigint, latestSaleTimestamp: timestamp, totalRevenue: double, totalPoints: bigint
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
     Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
      Subquery history
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
    Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
     Subquery sales
      Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Optimized Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Union
   Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
    LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
   Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
    Project [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L]
     Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Physical Plan ==
Repartition 4, false
 Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#34) AS latestSaleTimestamp#13,CombineSum(PartialSum#35) AS totalRevenue#14,CombineSum(PartialSum#36L) AS totalPoints#15L]
  Exchange (HashPartitioning 200)
   Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS PartialMax#34,SUM(totalRevenue#28) AS PartialSum#35,SUM(totalPoints#29L) AS PartialSum#36L]
    Union
     Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
      PhysicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
     Aggregate false, [userId#12L], [userId#12L,CAST(MAX(PartialMax#40L), TimestampType) AS latestSaleTimestamp#25,CombineSum(PartialSum#41) AS totalRevenue#26,CombineSum(PartialSum#42L) AS totalPoints#27L]
      Exchange (HashPartitioning 200)
       Aggregate true, [userId#12L], [userId#12L,MAX(saleTimestamp#9L) AS PartialMax#40L,SUM(totalRevenue#11) AS PartialSum#41,SUM(totalPoints#10L) AS PartialSum#42L]
        PhysicalRDD [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L], MapPartitionsRDD[6] at foreachRDD at ReadFromKafkaAndES.scala:51

Code Generation: false
== RDD ==

Second iteration:

== Parsed Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#13,totalRevenue#14,totalPoints#15L]
     Subquery history
      Repartition 4, false
       Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
        Subquery a
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
            Subquery history
             LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
           Subquery sales
            Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
    Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
     Subquery sales
      Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Analyzed Logical Plan ==
userId: bigint, latestSaleTimestamp: timestamp, totalRevenue: double, totalPoints: bigint
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#13,totalRevenue#14,totalPoints#15L]
     Subquery history
      Repartition 4, false
       Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
        Subquery a
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
            Subquery history
             LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
           Subquery sales
            Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
    Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
     Subquery sales
      Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Optimized Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Union
   Repartition 4, false
    Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
     Union
      Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
      Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
       Project [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L]
        Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
   Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
    Project [userId#146L,saleTimestamp#143L,totalRevenue#145,totalPoints#144L]
     Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Physical Plan ==
Repartition 4, false
 Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#166) AS latestSaleTimestamp#147,CombineSum(PartialSum#167) AS totalRevenue#148,CombineSum(PartialSum#168L) AS totalPoints#149L]
  Exchange (HashPartitioning 200)
   Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS PartialMax#166,SUM(totalRevenue#14) AS PartialSum#167,SUM(totalPoints#15L) AS PartialSum#168L]
    Union
     Repartition 4, false
      Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#172) AS latestSaleTimestamp#13,CombineSum(PartialSum#173) AS totalRevenue#14,CombineSum(PartialSum#174L) AS totalPoints#15L]
       Exchange (HashPartitioning 200)
        Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS PartialMax#172,SUM(totalRevenue#28) AS PartialSum#173,SUM(totalPoints#29L) AS PartialSum#174L]
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           PhysicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate false, [userId#12L], [userId#12L,CAST(MAX(PartialMax#178L), TimestampType) AS latestSaleTimestamp#25,CombineSum(PartialSum#179) AS totalRevenue#26,CombineSum(PartialSum#180L) AS totalPoints#27L]
           Exchange (HashPartitioning 200)
            Aggregate true, [userId#12L], [userId#12L,MAX(saleTimestamp#9L) AS PartialMax#178L,SUM(totalRevenue#11) AS PartialSum#179,SUM(totalPoints#10L) AS PartialSum#180L]
             PhysicalRDD [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L], MapPartitionsRDD[45] at foreachRDD at ReadFromKafkaAndES.scala:51
     Aggregate false, [userId#146L], [userId#146L,CAST(MAX(PartialMax#196L), TimestampType) AS latestSaleTimestamp#159,CombineSum(PartialSum#197) AS totalRevenue#160,CombineSum(PartialSum#198L) AS totalPoints#161L]
      Exchange (HashPartitioning 200)
       Aggregate true, [userId#146L], [userId#146L,MAX(saleTimestamp#143L) AS PartialMax#196L,SUM(totalRevenue#145) AS PartialSum#197,SUM(totalPoints#144L) AS PartialSum#198L]
        PhysicalRDD [userId#146L,saleTimestamp#143L,totalRevenue#145,totalPoints#144L], MapPartitionsRDD[46] at foreachRDD at ReadFromKafkaAndES.scala:51

Code Generation: false
== RDD ==

So, it seems the checkpointing has not the desired effect, but I still can't figure out how to fix this. Thanks for any hint!

Community
  • 1
  • 1
Tobi
  • 31,405
  • 8
  • 58
  • 90

2 Answers2

1

I'd recommend looking into updateStateByKey some more and considering flatmapping the updates (so each input record can create multiple possible updates).

One interesting thing is, your checkpointing the rdd of the history DataFrame, but when you register it as a temp table it always uses the logical plan so your checkpointing might not be doing anything (but I'm not sure and its close to 1am here, so I'd add in an explain in your debugging and you can see what the lineage ends up looking like).

Holden
  • 7,392
  • 1
  • 27
  • 33
  • Thanks for your fast answer! I think the hint concerning the checkpointing of the `history.rdd` is worth looking into... Would there a better way to do this on the `DataFrame` itself? I also tried `history.cache()`, but this doesn't really help either. BTW, I'm a huge fan of your book :-) – Tobi Aug 27 '15 at 08:05
  • 1
    I'm glad you enjoyed my book :) So caching the `DataFrame` doesn't allow Spark to discard the lineage information. Spark SQL's lineage information is a bit more complex. As you've noticed with the updated debugging information, the checkpointing isn't doing anything. I suspect the best case forward would be the flatMap + updateStateByKey approach, but if that doesn't work you could try periodically checkpointing the RDD and constructing a new DataFrame on the checkpointed RDD. – Holden Aug 27 '15 at 18:34
0

You are checkpointing each RDD that comes in every 20 seconds. How many RDDs is that? It could be dozens. Checkpointing writes the dataset to the filesystem which kind of defeats one of Sparks main selling points: in-memory processing.

Things which I think you should consider:

  • Don't checkpoint every iteration, rather do so every 5-10 iterations.
  • Reconsider calling checkpoint within foreachRDD. Perhaps try to create the new aggregates within a map function, then call coalesce(1) / repatriation(1) on the output RDD to reduce the resulting aggregates down to one RDD partition. And then checkpoint that every 5 - 10 sliding intervals.

Doing those two things should reduce the amount of file-system interaction dramatically.

Patrick McGloin
  • 2,204
  • 1
  • 14
  • 26
  • Thanks for your answer. To my understanding, my issue is not the file-system interactions (you're completely right about "defeating" Spark's benefits), and the potential number of checkpoints-writes. The issue is IMHO that the new "history" is not persited for the next iteration, but is recalculated again and again, which takes longer with each iteration. And that's not how, at least how I, expected this to work. – Tobi Aug 27 '15 at 08:47
  • I would be suspicious about this. If you are having performance problems I think you are introducing file system performance bottlencks. Can you check how many partitions are in your streaming RDD each iteration? And how big are these checkpoint directories? It could be that writes are not finishing before the next iteration is starting. – Patrick McGloin Aug 27 '15 at 08:55
  • I'm having the lineage issues (which are visible with the debugging output, and the graphs in my other question) with and without checkpointing enabled, so I guess it can't be about file-system-specific issues then. The data which is written is only a few kbytes per iteration as I'm only using test data currently. – Tobi Aug 27 '15 at 08:57
  • The recommendation for checkpointing is to checkpoint once every 5 - 10 iterations. You are checkpointing dozens, perhaps hundreds of times every iteration. I really think you should look at implementing checkpointing more along the lines of the recommendations. Anyway, good luck with it. – Patrick McGloin Aug 27 '15 at 09:04
  • I think it's not correct that it checkpoints dozens or perhaps hundreds of times per iteration. I think it's exactly one time. The messages from Kafka are queued for each 20sec interval IMHO. The aggreation takes place only *once* per interval. Or I have a complete misunderstanding how Spark Streaming works with Kafka (which is possible, as I'm a beginner) – Tobi Aug 27 '15 at 09:07
  • Are you are its exactly once? foreachRDD only gets called once each iteration? Check your debug statements or the size of the RDD in the Spark UI. The only other issue I can think of is a race condition on your history var, but that should not be an issue if every iteration is only called once. – Patrick McGloin Aug 27 '15 at 09:18
  • Here's a screenshot of my checkpoint directory: http://imgur.com/YqLmHst Each checkpoint is around 22kbytes. And there are 3 checkpoints per minute, which matches the 20sec interval. – Tobi Aug 27 '15 at 09:22
  • That does look like just one partition in each RDD, which should mean you don't have race conditions on the var. You could add a simple repartition(1) call just before your foreachRDD to make sure about that. – Patrick McGloin Aug 27 '15 at 09:47