1

I recieve data from kafka in the format,where null is the key.

null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23

I have now mapped the values to remove the null key and form new key and value pairs using the following code.

 val topics = Array("kafka-topic")
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
    )
    streamingContext.checkpoint("hdfs:///hdfs/location")
    val record= stream.map(record=>record.value().toString)


    val rdds=record.transform
    {
    pps=>pps.flatMap(_.split(","))
    }

    val ppds= rdds.transform
`  `{
    pair=>pair.map(vals=>
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23)
 }

where vals(2)a String will be the key and the remaining 22 values will be the values.

I am now trying to get the average of all the values per key for a time window of 20 seconds and continuously push the calculated averages per key into a data store(HBASE). .In batch mode i understand that there is aggregatebykey() method that allows you to do this.

In streaming mode how can this be achieved?

There is also a possibility that some of the values are strings how do i skip over the values that are strings and calculate average of only numerical types while continuously pushing updates to HBASE ?

vva
  • 133
  • 4
  • 11
  • Although it wasn't for streaming, I asked and answered a similar question a while back, and it (the following URL) may help you and others: https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth – NYCeyes Jan 12 '18 at 01:04

2 Answers2

1

Use reduceByKeyAndWindow,

// Reduce last 30 seconds of data, every 10 seconds

val aggregateFunction = (a:Int,b:Int) => (a + b)
val pairDStream = // DStream contains (word,1)
val windowedWordCounts = pairDStream.reduceByKeyAndWindow(aggregateFunction, Seconds(30), Seconds(10))

Above example will be used to calculate word counts over a window period, instead of using simple addition function like above, you can write more complex aggregate functions and use it along with reduceByKeyAndWindow

For more information
https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/10%20Window%20Aggregations.html

SanthoshPrasad
  • 1,165
  • 9
  • 11
0

You can use something like this:

// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
    val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))

    // Now count them up over a 5 minute window sliding every one second
    val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
    //  You will often see this written in the following shorthand:
    //val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))

    // Sort the results by the count values
    val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
KayV
  • 12,987
  • 11
  • 98
  • 148