3

I am using spring-cloud-stream-binder-kafka-streams:3.1.1 with functional programming. How can I retrieve all headers in processor function

Java code

@SpringBootApplication
public class KafkaMessageApplication {
    public static void main(String args[]) {
        SpringApplication.run(KafkaMessageApplication.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process() {
        // TODO investigate headers on the incoming message
        // For example, find partition key on which message was received and publish to same partition key on destination topic
        return input -> input;
    }
}
user2459396
  • 99
  • 10

1 Answers1

2

In order to access the headers like that, you need to use the low-level processor/transformer API in Kafka Streams. You can mix the low-level processor API and the DSL while still is using it as a Spring Cloud Stream application. See this for more details. Basically, you need to use the processor in the case of a consumer and the transformer in the case of a function. The processor is a terminal API and does not allow you to continue further. On the other hand, when using the transformer, you can continue it as a KStream after examining the headers. For example, here is an idea:

input -> input
                    .transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
                        @Override
                        public Transformer<String, String, KeyValue<String, String>> get() {
                            return new Transformer<Object, String, KeyValue<Object, String>>() {
                                ProcessorContext context;
                                @Override
                                public void init(ProcessorContext context) {
                                    this.context = context;
                                }

                                @Override
                                public KeyValue<Object, String> transform(Object key, String value) {

// Here you can access the headers using this.context.headers()
                                    return new KeyValue<>(key, value);
                                }

                                @Override
                                public void close() {

                                }
                            };
                        }
                    })
                    .map(...)
                    .groupBy(...)
                    ...

Look at the comment inside the transform method. There, you get access to the headers on each incoming record.

By looking at your question, I see that you are trying to get the partition id of the incoming record. For that, you can directly call context.partition(). I don't think you need to access the headers for that.

Here is an SO thread on accessing headers.

sobychacko
  • 5,099
  • 15
  • 26
  • Thats really helpful. Thank you very much! – user2459396 Mar 15 '21 at 09:18
  • 1
    @sobychacko so there is no way to use the annotations from old-style approach from StreamListeners like '@Headers', '@Header' or '@Payload'? The solution you provided works, but it looks way worse from a readability perspective from what was deprecated... – Przemysław Gęsieniec Jun 21 '21 at 12:51
  • 1
    Hi, the `@Header` or. `@Payload` solutions were never available for Kafka Streams binder, even with the `StreamListener`. You always needed to access the headers differently. – sobychacko Jun 21 '21 at 14:10
  • @sobychacko I tried this approach but ProcessorContext object I get is always null. What must be going wrong? – Shades88 Jan 26 '22 at 19:09