While working on Scala Kafka KeyValueMapper
implementation I am getting following error. I am not sure what exactly is the difference.
Thanks for your help.
Code:
I created a
KTable
from topic.val creducer: Reducer[java.lang.Long] = (v1, v2) => if (v1 > v2) v1 else v2 val deduplicationWindow = TimeWindows .of(60000L * 10) .advanceBy(60000L) .until(60000L * 10) val ktwindow: KTable[Windowed[String], java.lang.Long] = ipandTime .groupByKey(Serdes.String(), Serdes.Long()) .reduce(creducer, deduplicationWindow, "ktwindow-query")
I am getting error while using selectKey method when I try to create stream with key of
Windowed[String]
. Similar implementation in java works fine.val fStream = ktwindow .toStream() .selectKey( new KeyValueMapper[Windowed[String], java.lang.Long, KeyValue[String, java.lang.Long]] { override def apply( key: Windowed[String], value: java.lang.Long): KeyValue[String, java.lang.Long] = { new KeyValue(key.key(), value) } } )
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]