2

I posted another question with a similar regards a few days ago:

I managed to get at least a "working" solution now, meaning that the process itself seems to work correctly. But, as I am a bloody beginner concerning Spark, I seem to have missed some things on how to build these kind of applications in a correct way (performance-/computational-wise)...

What I want to do:

  1. Load history data from ElasticSearch upon application startup

  2. Start listening to a Kafka topic on startup (with sales events, passed as JSON strings) with Spark Streaming

  3. For each incoming RDD, do an aggregation per user
  4. Union the results from 3. with the history
  5. Aggregate the new values, such as total revenue, per user
  6. Use the results from 5. as new "history" for the next iteration

My code is the following:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object ReadFromKafkaAndES {
  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("kafka").setLevel(Level.WARN)

    val checkpointDirectory = "/tmp/Spark"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")

    val topicsSet = Array("sales").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(15))
    ssc.checkpoint(checkpointDirectory)

    //Create SQLContect
    val sqlContext = new SQLContext(sc)

    //Get history data from ES
    var history = sqlContext.esDF("data/salesaggregation")

    //Kafka settings
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Register temporary table for the aggregated history
        history.registerTempTable("history")

        println("--- History -------------------------------")
        history.show()

        //Parse JSON as DataFrame
        val saleEvents = sqlContext.read.json(rdd.values)

        //Register temporary table for sales events
        saleEvents.registerTempTable("sales")

        val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")

        println("--- Sales ---------------------------------")
        sales.show()

        val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")

        println("--- Aggregation ---------------------------")
        agg.show()

        //This is our new "history"
        history = agg

        //Cache results
        history.cache()

        //Drop temporary table
        sqlContext.dropTempTable("history")

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

The computations seem to work correctly:

--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp|         productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...|         91|       12.05|    23|
|2015-07-22 12:50:...|Buffer(256, 384, ...|         41|        7.05|    24|
+--------------------+--------------------+-----------+------------+------+

--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...|            255.59|        208|
|    24|2015-07-29 09:17:...|226.08999999999997|        196|
+------+--------------------+------------------+-----------+

--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...| 267.6400001907349|        299|
|    24|2015-07-29 09:17:...|233.14000019073484|        237|
+------+--------------------+------------------+-----------+

but if the applications runs several iterations, I can see that the performance deteriorates:

Streaming Graphs

I also see a high number of skipped tasks, which increases with every iteration:

Skipped tasks

The first iteration's graphs look like

enter image description here

The second iteration's graphs look like

enter image description here

The more iterations have passed, the longer the graph will get, with lots of skipped steps.

Basically, I think the problem is with storing the iterations' results for the next iteration. Unfortunately, also after trying a lot of different things and reading the docs, I'm not able to come up with a solution for this. Any help is warmly appreciated. Thanks!

Community
  • 1
  • 1
Tobi
  • 31,405
  • 8
  • 58
  • 90

1 Answers1

2

This streaming job is not in 'deadlock' but its execution time is increasing exponentially with every iteration, resulting in a streaming job that will fail sooner rather than later.

The union->reduce->union->reduce... iterative process on RDDs creates an ever increasing lineage of the RDD. Every iteration adds dependencies to that lineages that need to be computed on the next iteration, also resulting in the increasing execution time. The dependency (lineage) graph shows that clearly.

One solution is to checkpoint the RDD at regular intervals.

history.checkpoint()

You could also explore replacing the union/reduce process by updateStateByKey

maasg
  • 37,100
  • 11
  • 88
  • 115
  • Thanks a lot for your answer. I was assuming that calling `cache()` or `persist()` would the lineage problems, but apparantly I misunderstood that point. As `history` is a `DataFrame` and not a `RDD`, I think I can't use `history.checkpoint()`... – Tobi Jul 29 '15 at 09:36
  • Sorry, I had a look at the docs, there is a `history.rdd.checkpoint()`. Using this causes the whole process to stall even faster... Seems like using `cache()` would be the better option. I'm really pulling my hair on this. Probably I'll have to opt for `updateStateByKey()`, but then I'll loose all the convenience of handling the JSON payloads via `SQLContext`... – Tobi Jul 29 '15 at 09:46
  • Turns out if I remove the `show()` commands, each iteration runs in below 100ms... To be honest, I don't really understand why this is the case. Is it because the data gets pulled from the partitions? – Tobi Jul 29 '15 at 10:02
  • @Tobi if you remove the `show()` which action will get executed on the underlying RDD? My guess it that no action will be executed, therefore it's doing nothing. – maasg Jul 29 '15 at 10:30
  • It looks like that, yes. If I add `history.rdd.checkpoint()`, it's right away "stalling" the processes, so that I immediately see some scheduling delay, which increases every iteration. Somehow I don't understand how this (should) work. The amount of data I test with is ridiculously small (~ 30 events / 15 seconds) – Tobi Jul 29 '15 at 10:44
  • Still not sure why checkpointing doesn't solve the problems. Do you maybe have another idea? I'll try the updateStateByKey method tomorrow. Sadly, I'll probably loose the convenient JSON handling along the way... – Tobi Jul 29 '15 at 20:25