0

I am trying to test simple flink kafka example. mvn package works fine and then I ran ../../flink-1.14.3/bin/flink run -c com.comapny.flinktest.App target/flinktest-1.0-SNAPSHOT.jar and got the following error:

java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer
    at com.optiver.flinktest.App.main(App.java:38)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

My pom.xml:

    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.company.flinktest</groupId>
  <artifactId>flinktest</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>flinktest</name>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.14.3</version>
        </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.14.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.14.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.14.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.12</artifactId>
      <version>1.14.3</version>
      <scope>Compile</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.0.2</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.company.flinktest.App</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
      </plugins>
  </build>
</project>

Folder structure: src/main/java/com/company/flinktest/App.java

Editor: Vscode

App.java Code:

    package com.company.flinktest;
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.InvalidPropertiesFormatException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public  class App{
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafkaserver:9092");
        properties.setProperty("group.id", "testgroup");
        
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(
            "mytopic",
            new SimpleStringSchema(),
            properties);
        
        DataStream<String> stream = env.addSource(myConsumer);
        stream.print();
        env.execute("flink from kafka");
    }
}

It looks like your post is mostly code; please add some more details. It looks like your post is mostly code; please add some more details. It looks like your post is mostly code; please add some more details. It looks like your post is mostly code; please add some more details. It looks like your post is mostly code; please add some more details. It looks like your post is mostly code; please add some more details.

David
  • 103
  • 1
  • 9

1 Answers1

0

You can refer to my answer in one of the posts. Check the Scala language version though.

whatsinthename
  • 1,828
  • 20
  • 59