1

I'm attempting to connect to and read from Kafka (2.1) on my local machine, in the scala-shell that comes with Flink (1.7.2).

Here's what I'm doing :

:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

After, the last statement I'm getting the following error :

scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
 cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
   var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

I have created the topic named "topic" and I'm able to produce and read messages from it, through another client correctly. I'm using java version 1.8.0_201 and following the instructions from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .

Any help on what could be going wrong?

div93
  • 205
  • 1
  • 3
  • 14

2 Answers2

2

Some dependencies need other dependencies, implicitly. We usually use some dependency managers like maven or sbt and when we add some dependencies into the project, the dependency manager will provide its implicit dependencies in the background.

On the other hand, when you use shells, where there is no dependency manager, you are responsible for providing your code dependencies. Using Flink Kafka connector explicitly needs the Flink Connector Kafka jar, but you should notice that Flink Connector Kafka needs some dependencies, too. You can find it's dependencies at the bottom of the page, which is in the section Compile Dependencies. So starting with this preface, I added the following jar files to the directory FLINK_HOME/lib (Flink classpath):

flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar    
flink-connector-kafka-0.9_2.11-1.4.2.jar   
flink-connector-kafka-base_2.11-1.4.2.jar  
flink-core-1.4.2.jar                                         
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar

and I could successfully consume Kafka messages using the following code in the Flink shell:

scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

scala> import java.util.Properties
import java.util.Properties

scala> val properties = new Properties()
properties: java.util.Properties = {}

scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null

scala> properties.setProperty("group.id", "test")
res1: Object = null

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091

scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
hello
hello

In addition, another way to add some jar files to the Flink classpath is to pass the jars as arguments for Flink shell start command:

bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"

Test environment:

Flink 1.4.2
Kafka 2.1.0
Java  1.8 201
Scala 2.11
Soheil Pourbafrani
  • 3,249
  • 3
  • 32
  • 69
  • Wow. Thanks a lot for the explanation and the example. I'll run it and check. I thought all dependencies were met since there wasn't any missing class or dependency error. Very helpful! – div93 Mar 11 '19 at 18:44
  • At first, I got the same error as you. Just download all dependencies I listed and put them in the FLINK_HOME/lib. I hope this help. – Soheil Pourbafrani Mar 11 '19 at 18:49
  • @div93 Since I use Flink 1.4.2, I got it's corresponding dependencies. You should get dependecies for your corresponding version, 1.7.2. – Soheil Pourbafrani Mar 11 '19 at 18:51
0

Most probably you should import Flink's Scala implicits before adding a source:

import org.apache.flink.streaming.api.scala._
tgrez
  • 704
  • 5
  • 10
  • another thing that I can see is a version mismatch (1.7.2 vs 1.7.1 in kafka connector), maybe unifying this will help? – tgrez Mar 11 '19 at 09:35
  • I tested with v1.7.1 . It didn't resolve this error, but introduced a new one : scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print() error: error while loading Function, class file '/Users/path/flink-1.7.1/lib/flink-dist_2.11-1.7.1.jar(org/apache/flink/api/common/functions/Function.class)' has location not matching its contents: contains class Function :69: error: overloaded method value addSource with alternatives: – div93 Mar 11 '19 at 18:59