0

I'm getting following error when trying to run a Flink job:

Caused by: java.lang.RuntimeException: Could not extract key from ({"HASH_KEY": "6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b"},{"HASH_KEY": "6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b", "RECORD_SOURCE": "DEBUG", "LOAD_DATE": 1598950818140, "standortid_bk": null})
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:56) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
Caused by: java.lang.NullPointerException
    at com.***.flink_dedup.StreamingJob$1.getKey(StreamingJob.java:231) ~[?:?]
    at com.***.flink_dedup.StreamingJob$1.getKey(StreamingJob.java:1) ~[?:?]
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:54) ~

I tracked the issue down to the keyBy() fragment of my code (line 231 of StreamingJob), but I don't know what's wrong, since I used this exact same keyBy() mechanism in another Flink project without problems. This was on Flink 1.9.1 though, while I'm working with 1.11.1 now. Did something change in respect to that? My code for building up the stream looks like this:

env.addSource(kafkaAvroGenericConsumer(inputTopic, inputSchemaK, inputSchemaV))
    .keyBy(new KeySelector<Tuple2<GenericRecord,GenericRecord>, String>() {
        public String getKey(Tuple2<GenericRecord,GenericRecord> t) { 
            return t.f0.get(hashKeyField).toString(); }
    })
    .flatMap(new DuplicateHubFilter())      //.uid() per operator for evolving states
    .returns(new TupleTypeInfo<Tuple2<GenericRecord,GenericRecord>>(new GenericRecordAvroTypeInfo(new Schema.Parser().parse(outputSchemaK)), new GenericRecordAvroTypeInfo(new Schema.Parser().parse(outputSchemaV))))
    .addSink(kafkaAvroGenericProducer(outputTopic, outputSchemaK, outputSchemaV));


    try {
        env.execute(jobName);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        //session.disconnect();
    }   

Funny enough, it works like a charm when I manually enter "HASH_KEY" instead of hashKeyField, but the variable hashKeyField is definitely filled with the String "HASH_KEY", I checked for this at multiple points. I found this thread, but moving to POJOs is not an option and the approach I mentioned above used to work fine on 1.9.1, so I'm kind of stuck in what the issue seems to be. Is this some value/reference problem and why does this arise now, what changed?

kopaka
  • 535
  • 4
  • 17
  • 1
    Have you tried to simply print the value of `hashKeyField` inside the `getKey()` method ? Is the value `"HASH_KEY"`, not `null` ? Is there any chance that the `hashKeyField` value is actually `null` at runtime ? thank you. – Mikalai Lushchytski Sep 07 '20 at 13:49
  • @MikalaiLushchytski Thanks for the hint, I only tested the value outside of the method and I now remember running into that exact same problem a few months ago. I assumed it would be sufficient for the variable to be `static`, but it actually has to be `final` in order to be used like this. – kopaka Sep 07 '20 at 14:47
  • [Extended reading](https://stackoverflow.com/questions/4732544/why-are-only-final-variables-accessible-in-anonymous-class) for people who stumble upon this thread and are curious why variables used in anonymous classes are required to be final. – kopaka Sep 07 '20 at 14:56

0 Answers0