0

I've been experiencing a bug for which I cannot find a solution. I'm trying to use Flink to process some data and store the results in Cassandra. However, I keep encountering an exception which prevents me from writing anything to Cassandra. It is mentioned already in this thread though I can't say with certainty I understand what the answer means and what I should do. I already turned my Classes into POJOs, and yet I still experience the same results.

After some tinkering, I think I may have pinpointed that the bug is caused by a .map() function performed on a SingleOutputStream, which turns the Tuples into my POJOs. Even so I can't be sure because testing it is quite arduous and it is possible that the bug simply happened to appear then. I do recall that this bug may have all started when I started using CassandraSink, so it might even be related to that, though the bug still occurs now even after commenting out the sink.

Does anyone have any idea how to solve this? I have no option but to use these frameworks because it's for an assignment.

I'll also provide the source code for my POJO and the Stream operations (excuse my spaghetti code):


package infoproject;

import com.datastax.driver.core.LocalDate;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "infosystems", name = "daily")
public class DailyData {

    @Column(name = "day")
    private LocalDate day = null;

    @Column(name = "sum")
    private int sum = 0;

    @Column(name = "min")
    private int min = Integer.MAX_VALUE;

    @Column(name = "max")
    private int max = Integer.MIN_VALUE;

    @Column(name = "avg")
    private double avg = 0;

    public DailyData() {}

    public DailyData(LocalDate dayin, int sumin, int minin, int maxin, double avgin) {
        day = dayin;
        sum = sumin;
        min = minin;
        max = maxin;
        avg = avgin;
    }
        /*public DailyData(LocalDate day, int sum, int min, int max, double avg) {
            this.setDay(day);
            this.setSum(sum);
            this.setMin(min);
            this.setMax(max);
            this.setAvg(avg);
        } */

    public LocalDate getDay() {
        return day;
    }

    public void setDay(LocalDate day) {
        this.day = day;
    }

    public int getSum() {
        return sum;
    }

    public void setSum(int sum) {
        this.sum = sum;
    }

    public int getMin() {
        return min;
    }

    public void setMin(int min) {
        this.min = min;
    }

    public int getMax() {
        return max;
    }

    public void setMax(int max) {
        this.max = max;
    }

    public double getAvg() {
        return avg;
    }

    public void setAvg(double avg) {
        this.avg = avg;
    }

    @Override
    public String toString() {
        return day.toString() + " : " + getSum() + " " + getMin() + " " + getMax() + " " + getAvg();
    }
}

SingleOutputStreamOperator<DailyData> alerts = input
                .windowAll(TumblingEventTimeWindows.of(windowTime))
                .aggregate(new WindowAggregation())
                .map(new MapFunction<Tuple5<Integer, Integer, Integer, Double, LocalDate>, DailyData>() {
                    @Override
                    public DailyData map(Tuple5<Integer, Integer, Integer, Double, LocalDate> rawData) throws Exception {
                        return new DailyData(rawData.f4, rawData.f0, rawData.f1, rawData.f2, rawData.f3);
                    }
                });

I'll provide the WindowAggregation class if requested, I just didn't want to bloat the question unnecessarily.

EDIT: I'm providing the exception as requested:

12:07:05,111 WARN  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetException: null
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:444) ~[flink-core-1.14.2.jar:1.14.2]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:467) ~[flink-core-1.14.2.jar:1.14.2]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:299) ~[flink-core-1.14.2.jar:1.14.2]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140) ~[flink-core-1.14.2.jar:1.14.2]
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37) ~[flink-core-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) ~[flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130) ~[flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:477) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-runtime-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-runtime-1.14.2.jar:1.14.2]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.NoSuchMethodError: 'void com.esotericsoftware.kryo.serializers.FieldSerializer.setIgnoreSyntheticFields(boolean)'
    at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:94) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
    at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
    ... 44 more
  • Please share the exception so we can see exactly what's going wrong. – David Anderson Jan 12 '22 at 09:01
  • You're absolutely right, I edited my original post, sorry for the omission. – Chris Tabakakis Jan 12 '22 at 10:11
  • In what you've shared, this is just a warning that isn't causing any real problems. How are trying to write to Cassandra, and how is that failing? – David Anderson Jan 12 '22 at 10:31
  • First of all, is it really just a warning? It seems like a warning followed by an exception to me. Edit: Second, this current code does **not** output to Cassandra, which is weird. All I do is process the stream and then add a `PrintSinkFunction` as a sink. I can also include my `WindowAggregation` class if needed but I can't see how that would affect anything, all it does is combine `Tuple5`s. – Chris Tabakakis Jan 12 '22 at 10:45
  • You're right; it tries to fallback to Kryo, and that fails. What happens if you remove the `@Table` and `@Column` annotations? I'm guessing they are causing this problem. – David Anderson Jan 12 '22 at 10:49
  • I only added that annotation to create a POJO, which was not my original plan. Originally I didn't even use that class, I used `Tuple5` and it still gave me the same error. – Chris Tabakakis Jan 12 '22 at 10:51
  • Maybe the problem is the LocalDate type? That's the only type I can see that isn't obviously easy to serialize. – David Anderson Jan 12 '22 at 11:08
  • I had problems with other classes when trying to import to Cassandra, so, following [this post](https://stackoverflow.com/questions/41480481/codec-not-found-for-requested-operation-date-java-util-date) I used `LocalDate`.However, changing it from `LocalDate` to `java.sql.Date` **didn't** give me the same serialization error, instead I got `com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [date <-> java.sql.Date]` so any suggestions? I don't know if the original problem persists and it was simply that **this** one broke first. Suggestions? – Chris Tabakakis Jan 12 '22 at 11:37
  • I suggest you completely remove datastax and cassandra from the pipeline, get it working, and then swap in the cassandra sink for the printsink. – David Anderson Jan 12 '22 at 15:45

0 Answers0