0

I create a program to count words in Wikipedia. It works without any errors. Then I created the Cassandra table with two columns "word(text) and count(bigint)". The problem is when I wanted to enter words and counts to Cassandra table.My program is in following:

    public class WordCount_in_cassandra {


    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

     DataStream<String> text=env.addSource(new WikipediaEditsSource()).map(WikipediaEditEvent::getTitle);

       DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

            CassandraSink.addSink(counts)
                    .setQuery("INSERT INTO mar1.examplewordcount(word, count) values values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();
       }

        // execute program
        env.execute("Streaming WordCount");
    }//main

  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

After running this code I got this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractCassandraTupleSink is not serializable. The object probably contains or references non serializable fields.

How I can solve the issue?

desertnaut
  • 57,590
  • 26
  • 140
  • 166
M_Gh
  • 1,046
  • 4
  • 17
  • 43
  • Possible duplicate of [How to debug serializable exception in Flink?](https://stackoverflow.com/questions/47945603/how-to-debug-serializable-exception-in-flink) – pppery May 29 '23 at 01:46

1 Answers1

1

I tried to replicate your problem, but I didn't get the serialization issue. Though because I don't have a Cassandra cluster running, it fails in the open() call. But this happens after serialization, as it's called when the operator being started by the TaskManager. So it feels like you have something maybe wrong with your dependencies, such that it's somehow using the wrong class for the actual Cassandra sink.

BTW, it's always helpful to include context for your error - e.g. what version of Flink, are you running this from an IDE or on a cluster, etc.

Just FYI, here are the Flink jars on my classpath...

flink-java/1.7.0/flink-java-1.7.0.jar
flink-core/1.7.0/flink-core-1.7.0.jar
flink-annotations/1.7.0/flink-annotations-1.7.0.jar
force-shading/1.7.0/force-shading-1.7.0.jar
flink-metrics-core/1.7.0/flink-metrics-core-1.7.0.jar
flink-shaded-asm/5.0.4-5.0/flink-shaded-asm-5.0.4-5.0.jar
flink-streaming-java_2.12/1.7.0/flink-streaming-java_2.12-1.7.0.jar
flink-runtime_2.12/1.7.0/flink-runtime_2.12-1.7.0.jar
flink-queryable-state-client-java_2.12/1.7.0/flink-queryable-state-client-java_2.12-1.7.0.jar
flink-shaded-netty/4.1.24.Final-5.0/flink-shaded-netty-4.1.24.Final-5.0.jar
flink-shaded-guava/18.0-5.0/flink-shaded-guava-18.0-5.0.jar
flink-hadoop-fs/1.7.0/flink-hadoop-fs-1.7.0.jar
flink-shaded-jackson/2.7.9-5.0/flink-shaded-jackson-2.7.9-5.0.jar
flink-clients_2.12/1.7.0/flink-clients_2.12-1.7.0.jar
flink-optimizer_2.12/1.7.0/flink-optimizer_2.12-1.7.0.jar
flink-streaming-scala_2.12/1.7.0/flink-streaming-scala_2.12-1.7.0.jar
flink-scala_2.12/1.7.0/flink-scala_2.12-1.7.0.jar
flink-shaded-asm-6/6.2.1-5.0/flink-shaded-asm-6-6.2.1-5.0.jar
flink-test-utils_2.12/1.7.0/flink-test-utils_2.12-1.7.0.jar
flink-test-utils-junit/1.7.0/flink-test-utils-junit-1.7.0.jar
flink-runtime_2.12/1.7.0/flink-runtime_2.12-1.7.0-tests.jar
flink-queryable-state-runtime_2.12/1.7.0/flink-queryable-state-runtime_2.12-1.7.0.jar
flink-connector-cassandra_2.12/1.7.0/flink-connector-cassandra_2.12-1.7.0.jar
flink-connector-wikiedits_2.12/1.7.0/flink-connector-wikiedits_2.12-1.7.0.jar
kkrugler
  • 8,145
  • 6
  • 24
  • 18
  • Dear @kkrugler thank you for your answer. I use Flink 1.6.1 and run this from the Intellij Idea. Also, I have Cassandra cluster with two nodes;but I run Flink locally and want to write data into Cassandra cluster. – M_Gh Dec 25 '18 at 07:54
  • I update the Flink version from 1.6.1 to 1.7.0 and run the program locally. It run without any errors. But I cannot run the program in Cassandra cluster. I get this error while running in Cassandra cluster: "Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed." – M_Gh Dec 25 '18 at 09:12
  • I could run the program on the Cassandra cluster. Because I wrote two IP addresses in "sethost", the program did not run correctly. – M_Gh Dec 25 '18 at 09:33