In order to access the headers like that, you need to use the low-level processor/transformer API in Kafka Streams. You can mix the low-level processor API and the DSL while still is using it as a Spring Cloud Stream application. See this for more details. Basically, you need to use the processor in the case of a consumer and the transformer in the case of a function. The processor is a terminal API and does not allow you to continue further. On the other hand, when using the transformer, you can continue it as a KStream
after examining the headers. For example, here is an idea:
input -> input
.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<Object, String, KeyValue<Object, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// Here you can access the headers using this.context.headers()
return new KeyValue<>(key, value);
}
@Override
public void close() {
}
};
}
})
.map(...)
.groupBy(...)
...
Look at the comment inside the transform
method. There, you get access to the headers on each incoming record.
By looking at your question, I see that you are trying to get the partition id of the incoming record. For that, you can directly call context.partition(). I don't think you need to access the headers for that.
Here is an SO thread on accessing headers.