I've been experiencing a bug for which I cannot find a solution. I'm trying to use Flink to process some data and store the results in Cassandra. However, I keep encountering an exception which prevents me from writing anything to Cassandra. It is mentioned already in this thread though I can't say with certainty I understand what the answer means and what I should do. I already turned my Classes into POJOs, and yet I still experience the same results.
After some tinkering, I think I may have pinpointed that the bug is caused by a .map() function performed on a SingleOutputStream, which turns the Tuples into my POJOs. Even so I can't be sure because testing it is quite arduous and it is possible that the bug simply happened to appear then. I do recall that this bug may have all started when I started using CassandraSink, so it might even be related to that, though the bug still occurs now even after commenting out the sink.
Does anyone have any idea how to solve this? I have no option but to use these frameworks because it's for an assignment.
I'll also provide the source code for my POJO and the Stream operations (excuse my spaghetti code):
package infoproject;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
@Table(keyspace = "infosystems", name = "daily")
public class DailyData {
@Column(name = "day")
private LocalDate day = null;
@Column(name = "sum")
private int sum = 0;
@Column(name = "min")
private int min = Integer.MAX_VALUE;
@Column(name = "max")
private int max = Integer.MIN_VALUE;
@Column(name = "avg")
private double avg = 0;
public DailyData() {}
public DailyData(LocalDate dayin, int sumin, int minin, int maxin, double avgin) {
day = dayin;
sum = sumin;
min = minin;
max = maxin;
avg = avgin;
}
/*public DailyData(LocalDate day, int sum, int min, int max, double avg) {
this.setDay(day);
this.setSum(sum);
this.setMin(min);
this.setMax(max);
this.setAvg(avg);
} */
public LocalDate getDay() {
return day;
}
public void setDay(LocalDate day) {
this.day = day;
}
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
public int getMin() {
return min;
}
public void setMin(int min) {
this.min = min;
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
public double getAvg() {
return avg;
}
public void setAvg(double avg) {
this.avg = avg;
}
@Override
public String toString() {
return day.toString() + " : " + getSum() + " " + getMin() + " " + getMax() + " " + getAvg();
}
}
SingleOutputStreamOperator<DailyData> alerts = input
.windowAll(TumblingEventTimeWindows.of(windowTime))
.aggregate(new WindowAggregation())
.map(new MapFunction<Tuple5<Integer, Integer, Integer, Double, LocalDate>, DailyData>() {
@Override
public DailyData map(Tuple5<Integer, Integer, Integer, Double, LocalDate> rawData) throws Exception {
return new DailyData(rawData.f4, rawData.f0, rawData.f1, rawData.f2, rawData.f3);
}
});
I'll provide the WindowAggregation
class if requested, I just didn't want to bloat the question unnecessarily.
EDIT: I'm providing the exception as requested:
12:07:05,111 WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetException: null
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:444) ~[flink-core-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:467) ~[flink-core-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:299) ~[flink-core-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140) ~[flink-core-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37) ~[flink-core-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) ~[flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130) ~[flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction.apply(PassThroughAllWindowFunction.java:35) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:48) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:477) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-streaming-java_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-runtime-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-runtime-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.NoSuchMethodError: 'void com.esotericsoftware.kryo.serializers.FieldSerializer.setIgnoreSyntheticFields(boolean)'
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:94) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) ~[flink-scala_2.11-1.14.2.jar:1.14.2]
... 44 more