We have written Kafka stream application which reads from source topic does some business logic in 2 processor and then write the output to sink topic.
Below is the code to create topology, add source,processor(please note we are adding 2 processors) and sink
Topology topology = new Topology();
topology.addSource("sourceProcessor", "source-topic")
.addProcessor("Process", ()->fileExtractProcessorObject , "sourceProcessor")
.addProcessor("XMLJSON", () -> xmlJson , "Process")
.addSink("sinkProcessor", "sink-topic", "XMLJSON");
KafkaStreams streams = new KafkaStreams(topology, getProperties(appConfig,sourceConfig));
streams.start()
Below is the first processor code
public class FileExtractProcessor implements Processor<String, byte[]> {
private ProcessorContext context;
public FileExtractProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, byte[] bytearray) {
/* business logic */
//This is pojo class which will be forwarded to next processor as value
ProcessData pData = new ProcessData();
// Setting pojo object and forwarding to next processor
pData.setValue(value);
pData.setXmlData(files);
pData.setAppconfig(ac);
context.forward(value.getRunKeyId(), pData, To.child("XMLJSON"));
context.commit();
}
}
In the above code when we forward method is invoked we get null pointer exception in the below code.
org.apache.kafka.streams.processor.internals.ProcessorContextImpl#forward(K, V, org.apache.kafka.streams.processor.To)
ProcessorNode child = this.currentNode().getChild(sendTo);
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:78)
at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:22)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
We are using kafka-stream-2.4.0.jar Can you please help where we are missing here. Not able to figure out why currentNode object is Null. Early help is highly appreciated.