I'm attempting to connect to and read from Kafka (2.1) on my local machine, in the scala-shell that comes with Flink (1.7.2).
Here's what I'm doing :
:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
After, the last statement I'm getting the following error :
scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
I have created the topic named "topic" and I'm able to produce and read messages from it, through another client correctly. I'm using java version 1.8.0_201 and following the instructions from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .
Any help on what could be going wrong?