1

While working on Scala Kafka KeyValueMapper implementation I am getting following error. I am not sure what exactly is the difference. Thanks for your help.

Code:

  1. I created a KTable from topic.

    val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2
    
    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)
    
    val ktwindow: KTable[Windowed[String], java.lang.Long] =
      ipandTime
        .groupByKey(Serdes.String(), Serdes.Long())
        .reduce(creducer, deduplicationWindow, "ktwindow-query")
    
  2. I am getting error while using selectKey method when I try to create stream with key of Windowed[String]. Similar implementation in java works fine.

    val fStream = ktwindow
      .toStream()
      .selectKey(
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
    
[error]  found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]

[error]  required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
Andrey Tyukin
  • 43,673
  • 4
  • 57
  • 93
  • Can you please share the code snippet and context to understand the question better ? – shuvomiah Apr 01 '18 at 07:51
  • I have edited the question with code snippet. – bijaya media Apr 01 '18 at 13:51
  • Looks like a common issue with type inference when using Java's call-site generics. Try attaching the explicit type parameters to `.groupByKey[Param1,...,ParamN](...)` and `.reduce[Param1, ..., ParamM](...)`, if the types actually do match, the errors will go away. – Andrey Tyukin Apr 01 '18 at 16:30
  • Hi @AndreyTyukin not sure I am doing it correctly. for example, api of groupByKey is `KGroupedStream groupByKey(Serde keySerde, Serde valSerde)` Then what would be Param1, Param2 becomes for my code ? `ipandTime .groupByKey[Param1, Param2 ](Serdes.String(), Serdes.Long())` – bijaya media Apr 02 '18 at 17:40

1 Answers1

0

The variable ipandTime was nowhere to find, so I've replaced it by a ???, but that doesn't have anything to do with the actual problem.

As I said, if type inference for Java use-site wildcards fails, then simply add explicit type arguments. This here compiles for Kafka 1.1.0:

import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue


object Q49594920 {

      val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2

    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)

    val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
      // ipandTime // What's that? It's not defined anywhere!
      //   .groupByKey(Serdes.String(), Serdes.Long())
      //   .reduce(creducer, deduplicationWindow, "ktwindow-query")

    val fStream = ktwindow
      .toStream()
      .selectKey[KeyValue[String, java.lang.Long]](
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
}

The selectKey method wanted a generic type argument KR, so I just gave the concrete type KeyValue[String, java.lang.Long] to it, then it just worked.

Andrey Tyukin
  • 43,673
  • 4
  • 57
  • 93