I recieve data from kafka in the format,where null is the key.
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
I have now mapped the values to remove the null key and form new key and value pairs using the following code.
val topics = Array("kafka-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
streamingContext.checkpoint("hdfs:///hdfs/location")
val record= stream.map(record=>record.value().toString)
val rdds=record.transform
{
pps=>pps.flatMap(_.split(","))
}
val ppds= rdds.transform
` `{
pair=>pair.map(vals=>
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23)
}
where vals(2)a String will be the key and the remaining 22 values will be the values.
I am now trying to get the average of all the values per key for a time window of 20 seconds and continuously push the calculated averages per key into a data store(HBASE). .In batch mode i understand that there is aggregatebykey() method that allows you to do this.
In streaming mode how can this be achieved?
There is also a possibility that some of the values are strings how do i skip over the values that are strings and calculate average of only numerical types while continuously pushing updates to HBASE ?