26

With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map on a KStream it provides arguments of the key and the value of the record but no way I can see to access the headers. It would be nice if we could just map over the ConsumerRecords.

ex.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

something like this would work:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...
zero323
  • 322,348
  • 103
  • 959
  • 935
Nathan Myles
  • 303
  • 1
  • 3
  • 8

1 Answers1

23

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

+++++

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • 4
    To clarify what Matthias said: Yes, the Processor API in Kafka Streams gives you access to record metadata such as topic name, partition number, offset, etc. The DSL in Kafka Streams does not give you access. But because you can combine the Processor API and the DSL, you can still write a DSL-based stream processing application that accesses the record metadata by using the DSL's `transform()` or `transformValues()` function, which allows you to pass in a Processor/Transformer from the Processor API. – miguno Oct 16 '17 at 08:26
  • Thanks for the information guys, I'll keep an eye out for when the metadata is added to the DSL level so that this answer can be updated. – Nathan Myles Oct 16 '17 at 15:02
  • @MatthiasJ.Sax and @MichaelG.Noll: in https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams, for the `RecordContext` proposal, it doesn't seem to have the headers exposed. Is that something that will be added? – Nathan Myles Oct 16 '17 at 15:39
  • There are no plans to extend {{RecordContext}} via KIP-159 -- when we add header support, it's TDB how this would look like, but I would assume we would add new methods to {{RecordContext}} for this. What the Jira if you are interested in details :) – Matthias J. Sax Oct 16 '17 at 17:08
  • 2
    @MatthiasJ.Sax Still not 100% clear to me: Does this mean that via Streams 1.0.1 there is no way to access the headers of a message neither via DSL nor via Processor API? I am asking as by checking ProcessorContext (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html) I cannot locate the headers of the currently processed message. – Vassilis Nov 23 '18 at 15:08
  • Accessible metadata in `0.11`, `1.0` and `1.1` does not include headers. You will need to use Kafka Streams `2.0` to get header access: https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API -- I'll update the answer. – Matthias J. Sax Nov 23 '18 at 18:53
  • Using `KStream#to(TopicNameExtractor)` you get access to the record header, too. – Matthias J. Sax Feb 26 '19 at 23:26
  • What a nice coincidence I saw this just a day after you added info about 2.7.0 upgrade, I took a a chance and tried out api.Processor from 2.8.0 and all looks fine apart from one major issue - it's not compatible with KStream. Or am I missing something? – Konrad May 06 '21 at 07:21
  • Are you referring to `KStream#process()` method? As I tried to point out, the KIP only changed the PAPI, ie., `Topology`, but the DSL changes are not implemented yet. Cf. https://issues.apache.org/jira/browse/KAFKA-8410 – Matthias J. Sax May 06 '21 at 15:56