I am using POJO classes to deserialize json strings from Kafka to Events class ( this is the POJO class).
It has 100+ fields and some of them can be null in the source. I am able to use jackson objectmapper to convert it to Event Class' object.
But when I translate this datastream to table it throws exception.
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 22 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.poshmark.scala.flink.EnrichLauncher$$anon$1.flatMap(DemuxLauncher.scala:206)
at com.poshmark.scala.flink.EnrichLauncher$$anon$1.flatMap(DemuxLauncher.scala:182)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 28 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 37 more
Caused by: java.lang.NullPointerException
at org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39)
at StreamExecCalc$7.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 43 more
Code in context is:
val mapped = filteredEvents.flatMap(new FlatMapFunction[String, RawEvents]() {
@throws[Exception]
def flatMap(jsonRow: String, out: Collector[RawEvents]): Unit = {
var m = new RawEvents()
try {
val objectMapper = new ObjectMapper() with ScalaObjectMapper
objectMapper.registerModule(DefaultScalaModule)
val a = objectMapper.readValue[EventMessageWrapper](jsonRow)
m = a.getMessage
} catch {
case e:Exception => {
LOGGER.error(s"Got some other kind of Throwable exception ${e.getMessage}")
LOGGER.error(jsonRow)
LOGGER.error(e.getStackTrace.mkString("\n"))
// println(s"Got some other kind of Throwable exception ${e.getMessage}")
}
}
if(m.getAt != null){
LOGGER.error("aaa"+m.toString)
out.collect(m)
}
}
})
DemuxLauncher.scala:206 points to out.collect(m)
I did read What is a NullPointerException, and how do I fix it? and I understand it is caused because some of my string values are null. But I am expecting flink to be able to handle null valued strings