0

I tried submitting the simple flink job to accept messages from kafka, but after submitting the job, within less than a minute, the job fails with the following kafka exception. I have kafka 2.12 running on my local machine and I have configured the topic which this job consumes from.

public static void main(String[] args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> kafkaData = env
            .addSource(new FlinkKafkaConsumer<String>("test-topic",
                    new SimpleStringSchema(), properties));
    kafkaData.print();
    env.execute("Aggregation Job");
}

Here's the exception:

Job has been submitted with JobID 5cc30fe72f685406126e2f5a26f10341
------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 5cc30fe72f685406126e2f5a26f10341)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
 ...
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

I saw another question in stackoverflow, but that does not resolve the problem. I have not configured any SSL on the kafka broker. Any suggestions would be appreciated.

tintin
  • 5,676
  • 15
  • 68
  • 97

1 Answers1

0

I had this same issue today. In my case, the problem was that I failed to put my flink application in a VPC (my MSK cluster lives in a VPC). After editing the flink application and moving it into the appropriate VPC, the problem went away.

I realize this question is a few months old, but I figured I'd post my findings in case anyone else happens to come across this from a Google search like I did.