I am new to Java as well as to Apache Flink. I have created (Apache Maven project) using Eclipse with Java. This is a Flink program that reads data from the Apache Kafka topic.
I am using:
- Eclipse
- Flink (version: 1.7.2)
- Java (version: 1.8)
This is my program:
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class FlinkApplication {
public static void main(String[] args) throws Exception {
String bootstrapServers = "XX.XX.XXX.XXX:XXX";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up the Consumer and create a datastream from this source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>(
"first_topic", // input topic
new SimpleStringSchema(), // serialization schema
properties); // properties
DataStream<String> readingStream = env.addSource(flinkConsumer);
env.execute();
}
}
Along with this code, below is my pom.xml
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.elayat.flink</groupId>
<artifactId>FlinkApplication</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FlinkApplication</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.elayat.flink.FlinkApplication.FlinkApplication</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>javax/xml/**</exclude>
<exclude>org/apache/xerces/**</exclude>
<exclude>org/w3c/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
</project>
I have compiled this program as a jar file and load it to the Flink dashboard but it gives the following error:
We're sorry, something went wrong. The server responded with: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. [![enter image description here][1]][1]
When I checked my logs of the Flink cluster it shows the following WARN and error:
2021-05-18 14:31:25,167 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
2021-05-18 14:31:57,982 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-05-18 14:31:58,003 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
2021-05-18 19:32:32,124 ERROR org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler - Exception occurred in REST handler: File 8b2141cd-a854-4958-b192-efeb58db63e6_FlinkApplication-0.0.1-SNAPSHOT.jar does not exist in /tmp/flink-web-38332dc6-8876-4610-92a2-8ad4badc2460/flink-web-upload.
2021-05-18 19:33:07,093 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-05-18 19:33:07,116 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
2021-05-18 19:33:42,322 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-05-18 19:33:42,342 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
2021-05-18 19:34:03,232 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-05-18 19:34:03,252 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
Here is the Error showed by the jar file when I tried it in the cmd to see whether it is executible or not.
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:650)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:632)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.api.functions.source.SourceFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more