5

I am using Kafka Spring Integration for publishing and consuming messages using kafka. I see Payload is properly passed from producer to consumer, but the header information is getting overridden somewhere.

@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
        ExecutionException {
    try {
            System.out.println("Headers :" + message.getHeaders().toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

I get following headers:

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}

I am constructing the message at producer end like this:

Message<FeelDBMessage> message = MessageBuilder
                .withPayload(samplePayloadObj)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();

        // publish the message
        publisher.publishMessage(message);

and below is the header info at producer:

 headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}

Any idea why the Headers are overridden by a different value?

zer0Id0l
  • 1,374
  • 4
  • 22
  • 36

1 Answers1

6

Just because by default Framework uses the immutable GenericMessage.

Any manipulation to the existing message (e.g. MessageBuilder.withPayload) will produce a new GenericMessage instance.

From other side Kafka doesn't support any headers abstraction like JMS or AMQP. That's why KafkaProducerMessageHandler just do this when it publishes a message to Kafka:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

As you see it doesn't send headers at all. So, other side (consumer) just deals with only message from the topic as a payload and some system options as headers like topic, partition, messageKey.

In two words: we don't transfer headers over Kafka because it doesn't support them.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks Artem for your reply. But in the kafka-spring-integration doc says the following: The target topic and partition for publishing the message can be customized through the kafka_topic and kafka_partitionId headers, respectively. There is a code snippet too, which sets the header while sending the message: channel.send( MessageBuilder.withPayload(payload) .setHeader(KafkaHeaders.MESSAGE_KEY, "key") .setHeader(KafkaHeaders.TOPIC, "test") .build() ); – zer0Id0l Aug 24 '15 at 04:24
  • It also says that : "Important. The KafkaHeaders interface contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix" If the headers are not transferred what is the use of KafkaHeaders class? – zer0Id0l Aug 24 '15 at 04:24
  • It is for internal Spring Integration usage. For example on sending part for dynamic topic resolution. Please don't mix Spring Integration Kafka Support and Apache Kafka itself. Something similar you can find in the File module. – Artem Bilan Aug 24 '15 at 11:39
  • Got it! Thank again. But can you suggest any alternate way(if possible) of passing some data from producer to consumer without modifying the payload in anyway? – zer0Id0l Aug 24 '15 at 15:48
  • 1
    Yeah... No. There is no such a way. You always can build a new message with `payload` as an old `Message>` and provide some smart custom Kafka `(De)Serializer` for producer and consumer, respectively. – Artem Bilan Aug 24 '15 at 15:58
  • HI @ArtemBilan Kindly tell me purpose/use of KafkaHeaders.MESSAGE_KEY. – rahul Mar 07 '16 at 07:09
  • This header represents exactly the `key` from the Kafka protocol. It is used by Kafka Client to calculate the `partition` for message and also may play the `ordering` role in the topic logs. From big height it may represent some domain specific delimiter, e.g. messages for the particular user, events in the specific geographic place etc. – Artem Bilan Mar 07 '16 at 14:28
  • So what is a role of this configuration property: spring.cloud.stream.kafka.binder.headers=x-customHeader ? – S2201 Jun 15 '17 at 13:41
  • That is already the Spring Cloud Stream question and the answer is here: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties - headerMode – Artem Bilan Jun 15 '17 at 13:50