-1

I am new to Java as well as to Apache Flink. I have created (Apache Maven project) using Eclipse with Java. This is a Flink program that reads data from the Apache Kafka topic.

I am using:

  • Eclipse
  • Flink (version: 1.7.2)
  • Java (version: 1.8)

This is my program:


import java.util.Properties;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class FlinkApplication {
    
    public static void main(String[] args) throws Exception {
        
        
    String bootstrapServers = "XX.XX.XXX.XXX:XXX";

    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    


    // Set up the Consumer and create a datastream from this source
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", bootstrapServers);
    FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>(
        "first_topic",                          // input topic
        new SimpleStringSchema(),           // serialization schema
        properties);                        // properties
     

    DataStream<String> readingStream = env.addSource(flinkConsumer);
    
    env.execute();
}
}

Along with this code, below is my pom.xml

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.elayat.flink</groupId>
  <artifactId>FlinkApplication</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>FlinkApplication</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
  
     
    <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>
  
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.30</version>
    
</dependency>
  
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
        </dependency>
        
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.7.2</version>
    <scope>provided</scope>
</dependency>

        
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.7.2</version>
</dependency>
        
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.7.2</version>
</dependency>
        
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
       <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.elayat.flink.FlinkApplication.FlinkApplication</mainClass>
            </transformer>
          </transformers>
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>javax/xml/**</exclude>
                <exclude>org/apache/xerces/**</exclude>
                <exclude>org/w3c/**</exclude>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
        </configuration>
      </plugin>
    </plugins>
  </build>
  
 
</project>

I have compiled this program as a jar file and load it to the Flink dashboard but it gives the following error:

We're sorry, something went wrong. The server responded with: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. [![enter image description here][1]][1]

When I checked my logs of the Flink cluster it shows the following WARN and error:

2021-05-18 14:31:25,167 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.

2021-05-18 14:31:57,982 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.

2021-05-18 14:31:58,003 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

2021-05-18 19:32:32,124 ERROR org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler  - Exception occurred in REST handler: File 8b2141cd-a854-4958-b192-efeb58db63e6_FlinkApplication-0.0.1-SNAPSHOT.jar does not exist in /tmp/flink-web-38332dc6-8876-4610-92a2-8ad4badc2460/flink-web-upload.

2021-05-18 19:33:07,093 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.

2021-05-18 19:33:07,116 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

2021-05-18 19:33:42,322 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.

2021-05-18 19:33:42,342 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

2021-05-18 19:34:03,232 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.

2021-05-18 19:34:03,252 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

Here is the Error showed by the jar file when I tried it in the cmd to see whether it is executible or not.

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:650)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:632)
Caused by: java.lang.ClassNotFoundException:  
        org.apache.flink.streaming.api.functions.source.SourceFunction
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 7 more
Ellee
  • 21
  • 1
  • 8
  • You need to go into the logs of the Flink cluster and then You will be able to see the full exception with message saying why the job has crashed. – Dominik Wosiński May 18 '21 at 17:30
  • @DominikWosiński thank you for your quick response. I have edited my question please have a look at the last part of my question in which I have shown the massage from the **logs of the Flink cluster**. Thank you – Ellee May 18 '21 at 18:38
  • Isn't there anything else in the logs either before or after that ? I will be quite hard to do anything without exact error. – Dominik Wosiński May 18 '21 at 19:13
  • @DominikWosiński I have edited it again. Now you can see the whole message shown by the logs of the Flink cluster. Thank you for your consistent response. – Ellee May 18 '21 at 19:52

2 Answers2

1

Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.

Sounds like the JAR file you've uploaded is not the shaded one, or your mainClass tag in the XML shade plugin definition wasn't configured correctly

You can rename the jar file to a zip file, extract it, then look at the manifest file to see if a main class is defined

You can also execute java -jar /path/to/file.jar to tell if the JAR is actually executable

Related question

How can I create an executable JAR with dependencies using Maven?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I have edited my question after using the suggested command in the cmd and it gives me the error which I showed. Also, I tried to unzip the Shaded jar file and there is a main class available with the exact same name. But the issue has not yet resolved – Ellee May 19 '21 at 11:54
  • You used `java -jar`? Remove `provided` from the Flink streaming dependency since that is related to the class that cannot be found. When you use provided scope, it's not put into the shaded jar. If you submit the code to Flink server, the streaming libraries might be provided there. You should also be able to run the main method from Eclipse itself – OneCricketeer May 19 '21 at 12:49
1

The problem has been solved. It was due to the missing Map Function in the main method. Because the dashboard does not load a jar file without having a map function inside it. After writing the Map Function in my program I have been able to load and run inside the Flink Dashboard. Below is the Map Function I used in my program:

DataStream<String> readingStream = env.addSource(flinkConsumer);
    readingStream.rebalance().map(new RichMapFunction <String, String>(){

private static final long serialVersionUID = -2547861355L; //random number 
        
        public String map(String valueFromKafka) throws Exception{    
            System.out.println(valueFromKafka);
            return "Successfull";
        }
    });
    
    env.execute();
}
}
Ellee
  • 21
  • 1
  • 8