3

I have some sales-related JSON data in my ElasticSearch cluster, and I would like to use Spark Streaming (using Spark 1.4.1) to dynamically aggregate incoming sales events from my eCommerce website via Kafka, to have a current view to the user's total sales (in terms of revenue and products).

What's not really clear to me from the docs I read is how I can load the history data from ElasticSearch upon the start of the Spark application, and to calculate for example the overall revenue per user (based on the history, and the incoming sales from Kafka).

I have the following (working) code to connect to my Kafka instance and receive the JSON documents:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

I know that there's a plugin for ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), but it's not really clear to me how to integrate the read upon startup, and the streaming calculation process to aggregate the history data with the streaming data.

Help is much appreaciated! Thanks in advance.

Tobi
  • 31,405
  • 8
  • 58
  • 90

1 Answers1

1

RDDs are immutable, so after they are created you cannot add data to them, for example updating the revenue with new events.

What you can do is union the existing data with the new events to create a new RDD, which you can then use as the current total. For example...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

In this case we make currentTotal a var since it will be replaced by the reference to the new RDD when it gets unioned with the incoming data.

After the union you may want to perform some further operations such as reducing the values which belong to the same Key, but you get the picture.

If you use this technique note that the lineage of your RDDs will grow, as each newly created RDD will reference its parent. This can cause a stack overflow style lineage problem. To fix this you can call checkpoint() on the RDD periodically.

Patrick McGloin
  • 2,204
  • 1
  • 14
  • 26
  • Thanks a lot for your answer. I was already suspecting that `rdd.union()` would be a good way to start. Would `updateStateByKey()` also be a way to do this? As I want to persist the aggregations, I thought that this would maybe come handy as well... – Tobi Jul 27 '15 at 11:38
  • I think you could also use `updateStateByKey()`. Previously there was no way to specify the starting state for `updateStateByKey()` but I believe that has been added so it should also be a viable solution. – Patrick McGloin Jul 27 '15 at 11:52
  • Would it also be possible to do this with DataFrames? Somehow I can't get it to work... – Tobi Jul 28 '15 at 14:27
  • I assume you mean is the union -> reduceByKey pattern possible with DataFrames and off the top of my head I think it isn't. The reason is that you want to use the PairRDDFunctions such as union and reduceByKey. Looking into this I found a post about grouping data sets using DataFrames so you may want to look into that. But I think using RDDs might be easier in this case. https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html – Patrick McGloin Jul 28 '15 at 15:31
  • How do you ensure that you aren't processing the same record twice? ie. during the read of elasticsearch data, data is still streaming, it could be possible that you receive a streaming message that also exists at the tail end of the initial load query. Hope this makes sense and I imagine is a potentially common problem? – ShaunO Dec 02 '15 at 02:19