Is it possible to read Kafka message header which I set in my kafkaProducer application from kstream application? My KafkaProducer looks like this; I have set header in my message
public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;
private final KafkaTemplate<Integer, testEvent> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<Integer, testEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendTestEvent(DataDto data) throws Exception {
TestEvent=testEvent.newBuilder()
.setTestEventId(data.getTestEventId())
.setTest(data.getTest().toString())
.build();
Message<testEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, TOPIC)
.setHeader(KafkaHeaders.MESSAGE_KEY, 999)
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
.build();
this.kafkaTemplate.send(message);
log.info(String.format("Produced user -> %s", event));
}
the Kstream application is
public class MessageReader {
@Bean
public KStream<String, testEvent> kstreamPromotionUppercase(StreamsBuilder builder) {
KStream<String, testEvent> sourceStream = builder.stream("test-topic");
sourceStream.print(Printed.<String, testEvent>toSysOut().withLabel("Original Stream"));
KStream<String, testEvent> uppercaseStream =sourceStream.mapValues(this::MessageReaderCode);
return sourceStream;
}
How do I read my headers in kstream which I set into my kafkaproducer.