I tried submitting the simple flink job to accept messages from kafka, but after submitting the job, within less than a minute, the job fails with the following kafka exception. I have kafka 2.12 running on my local machine and I have configured the topic which this job consumes from.
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kafkaData = env
.addSource(new FlinkKafkaConsumer<String>("test-topic",
new SimpleStringSchema(), properties));
kafkaData.print();
env.execute("Aggregation Job");
}
Here's the exception:
Job has been submitted with JobID 5cc30fe72f685406126e2f5a26f10341
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 5cc30fe72f685406126e2f5a26f10341)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
I saw another question in stackoverflow, but that does not resolve the problem. I have not configured any SSL on the kafka broker. Any suggestions would be appreciated.