I'm getting following error when trying to run a Flink job:
Caused by: java.lang.RuntimeException: Could not extract key from ({"HASH_KEY": "6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b"},{"HASH_KEY": "6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b", "RECORD_SOURCE": "DEBUG", "LOAD_DATE": 1598950818140, "standortid_bk": null})
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:56) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
Caused by: java.lang.NullPointerException
at com.***.flink_dedup.StreamingJob$1.getKey(StreamingJob.java:231) ~[?:?]
at com.***.flink_dedup.StreamingJob$1.getKey(StreamingJob.java:1) ~[?:?]
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:54) ~
I tracked the issue down to the keyBy()
fragment of my code (line 231 of StreamingJob
), but I don't know what's wrong, since I used this exact same keyBy()
mechanism in another Flink project without problems. This was on Flink 1.9.1 though, while I'm working with 1.11.1 now. Did something change in respect to that? My code for building up the stream looks like this:
env.addSource(kafkaAvroGenericConsumer(inputTopic, inputSchemaK, inputSchemaV))
.keyBy(new KeySelector<Tuple2<GenericRecord,GenericRecord>, String>() {
public String getKey(Tuple2<GenericRecord,GenericRecord> t) {
return t.f0.get(hashKeyField).toString(); }
})
.flatMap(new DuplicateHubFilter()) //.uid() per operator for evolving states
.returns(new TupleTypeInfo<Tuple2<GenericRecord,GenericRecord>>(new GenericRecordAvroTypeInfo(new Schema.Parser().parse(outputSchemaK)), new GenericRecordAvroTypeInfo(new Schema.Parser().parse(outputSchemaV))))
.addSink(kafkaAvroGenericProducer(outputTopic, outputSchemaK, outputSchemaV));
try {
env.execute(jobName);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
//session.disconnect();
}
Funny enough, it works like a charm when I manually enter "HASH_KEY"
instead of hashKeyField
, but the variable hashKeyField
is definitely filled with the String "HASH_KEY", I checked for this at multiple points. I found this thread, but moving to POJOs is not an option and the approach I mentioned above used to work fine on 1.9.1, so I'm kind of stuck in what the issue seems to be. Is this some value/reference problem and why does this arise now, what changed?