2

We have written Kafka stream application which reads from source topic does some business logic in 2 processor and then write the output to sink topic.

Below is the code to create topology, add source,processor(please note we are adding 2 processors) and sink

Topology topology = new Topology();
topology.addSource("sourceProcessor", "source-topic")
        .addProcessor("Process", ()->fileExtractProcessorObject , "sourceProcessor")
        .addProcessor("XMLJSON", () -> xmlJson , "Process")
        .addSink("sinkProcessor", "sink-topic", "XMLJSON");
KafkaStreams streams = new KafkaStreams(topology, getProperties(appConfig,sourceConfig));
        streams.start()

Below is the first processor code

public class FileExtractProcessor implements Processor<String, byte[]> {
    private ProcessorContext context;
    public FileExtractProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, byte[] bytearray) {
            /* business logic */

            //This is pojo class which will be forwarded to next processor as value
            ProcessData pData = new ProcessData();

                // Setting pojo object and forwarding to next processor
                pData.setValue(value);
                pData.setXmlData(files);
                pData.setAppconfig(ac);

                context.forward(value.getRunKeyId(), pData, To.child("XMLJSON"));
                context.commit();    
    }
}

In the above code when we forward method is invoked we get null pointer exception in the below code.

org.apache.kafka.streams.processor.internals.ProcessorContextImpl#forward(K, V, org.apache.kafka.streams.processor.To)

ProcessorNode child = this.currentNode().getChild(sendTo); 

java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:78)
    at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:22)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

We are using kafka-stream-2.4.0.jar Can you please help where we are missing here. Not able to figure out why currentNode object is Null. Early help is highly appreciated.

chandu ram
  • 251
  • 2
  • 5
  • 19
  • there is a typo in the String for sink-topic: `.addSink("sinkProcessor", ""sink-topic, "XMLJSON");`. Maybe that already solves it? – Michael Heil May 14 '20 at 06:53
  • That is not the problem. That is just a typo error when writing question.The problem when running first processor – chandu ram May 14 '20 at 06:57
  • @chanduram, How do you create processor `fileExtractProcessorObject`? Do you share it anywhere? – Bartosz Wardziński May 14 '20 at 12:22
  • @user207421 Marking the question as duplicate does not seem to be appropriate. Can you please undo it? – Matthias J. Sax May 17 '20 at 23:24
  • Could be a bug in KafkaStream? Why do you use `To.child()` though? You only have one child and thus can omit this parameter. -- Can you reproduce the issue using TopologyTestDriver? If yes, could you file a bug report? – Matthias J. Sax May 17 '20 at 23:29

1 Answers1

1

Based on the code: https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183 the error implies that currentNode() returns null.

This indicate that you violate the rule of the ProcessorSupplier to return a new Processor instance on get(); this seems to hold as you pass () -> fileExtractProcessorObject to addProcessor returning the same object reference each time -- instead you need to create a new instance each time via () -> new FileExtractProcessor().

I created a ticket to improve the error message: https://issues.apache.org/jira/browse/KAFKA-10036

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137