1

Is it possible to read Kafka message header which I set in my kafkaProducer application from kstream application? My KafkaProducer looks like this; I have set header in my message

public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;

private final KafkaTemplate<Integer, testEvent> kafkaTemplate;

@Autowired
public Producer(KafkaTemplate<Integer, testEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
 }
 public void sendTestEvent(DataDto data) throws Exception {
TestEvent=testEvent.newBuilder()
.setTestEventId(data.getTestEventId())
.setTest(data.getTest().toString())
.build();
Message<testEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.TOPIC, TOPIC)
                .setHeader(KafkaHeaders.MESSAGE_KEY, 999)
                .setHeader(KafkaHeaders.PARTITION_ID, 0)
                .setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
                .build();
      this.kafkaTemplate.send(message);
      log.info(String.format("Produced user -> %s", event));
}

the Kstream application is

public class MessageReader {


@Bean
public KStream<String, testEvent> kstreamPromotionUppercase(StreamsBuilder builder) {
    
    KStream<String, testEvent> sourceStream = builder.stream("test-topic");

    sourceStream.print(Printed.<String, testEvent>toSysOut().withLabel("Original Stream"));
    
    KStream<String, testEvent> uppercaseStream =sourceStream.mapValues(this::MessageReaderCode);
    
    return sourceStream;
}

How do I read my headers in kstream which I set into my kafkaproducer.

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
Bunny
  • 111
  • 2
  • 11
  • Does this answer your question? [Is it possible to access message headers with Kafka Streams?](https://stackoverflow.com/questions/46736484/is-it-possible-to-access-message-headers-with-kafka-streams) – Paul Jul 17 '20 at 11:17
  • Hi Paul, I saw that but is there any example available. I know how to work with Stream DSL but not with Processor API. – Bunny Jul 17 '20 at 11:24
  • I'm interested in Kafka as well and prefer the Stream DSL myself. It took me some digging, but I think what you're after could be the StreamsMetadataState: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java – Paul Jul 17 '20 at 12:03

1 Answers1

1

Here is a simple example of how you can process your KStream:

sourceStream.process(() -> new ReadKafkaHeaderProcessor());

Then in the process method of the ReadKafkaHeaderProcessor you can do:

@Override
public void process(Object key, Object value) {
    Header header = context.headers().lastHeader(KafkaHeaders.TOPIC);
}
Nic Pegg
  • 485
  • 3
  • 7