3

According to this topic:
Kafka Spring Integration: Headers not coming for kafka consumer - this is no headers support for Kafka

But documentation says:

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.

I can't get it working with spring-cloud-stream-binder-kafka: 1.2.0.RELEASE

SENDING LOG:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers={
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
}]

RECEIVING LOG:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

I expect to see the same message id and get correlationId on receiving side.

application.properties:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

SENDING MESSAGE:

@Publisher(channel = "testChannel")
public Object newTest(Object param) {
    ...
    return myObject;
}
Jeff
  • 1,871
  • 1
  • 17
  • 28
S2201
  • 1,339
  • 3
  • 18
  • 35

1 Answers1

3

Yes, it does: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

headerMode

When set to raw, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.

Default: embeddedHeaders

But that is already Spring Cloud Stream story, not Spring Kafka per se.

Community
  • 1
  • 1
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • 1
    No, but by default only a subset of headers are transported - see https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/BinderHeaders.java#L39. If you want to transport your own headers, you can add their names to the property that Artem has provided. – Marius Bogoevici Jun 15 '17 at 14:03
  • Thank for your replies! But this is actually the question - it should work according to the documentation, but it does not. Pease, see the update, I've added the log and configuration. – S2201 Jun 15 '17 at 14:21
  • 2
    See my update to [the other question you commented on](https://stackoverflow.com/questions/38961697/spring-cloud-stream-kafka-binder-headers-not-working-as-expected/38962861#comment76128640_38962861) - I tested that sample with Dalston.SR1 (1.2.1) and it works just fine. – Gary Russell Jun 15 '17 at 14:26
  • 2
    The updated project is in my [sandbox repo](https://github.com/garyrussell/sandbox/tree/master/so38961697). – Gary Russell Jun 15 '17 at 14:35
  • Thank you again, guys. The problem was in the message sending (added to the question). The message logged was not the message sent :-) Now I can pass the custom headers. Should the message ID be the same on producer and on receiver sides? – S2201 Jun 15 '17 at 15:10
  • 2
    The message id should be treated as an internal property (messages may undergo internal transformations between Spring Integration, Spring Kafka, Spring Cloud Stream). Ideally you should use your own custom header for conveying message identity. – Marius Bogoevici Jun 15 '17 at 15:15
  • Yes, thanks. I have one more subsequent question, but I posted it in the [separate thread](https://stackoverflow.com/questions/44571478/spring-cloud-stream-default-custom-message-headers) – S2201 Jun 15 '17 at 15:36