2

I am trying to use spring.cloud.stream.kafka.binder.headers to transport a custom header that I am setting based upon a previous question.

I have read in the documentation where...

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.

seems to suggest that setting a list (comma separated?) will cause a custom header to get transported in the Message<>, but the header is lost as soon as the kafka write is completed.

My annotation creates the header as a part of the call to the MessagingGateway:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
    ...
}

I observe that the header is properly created in the first preSend debug:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}

But upon the next preSend, the header is missing:

2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {}

My properties contain:


    spring.cloud.stream.kafka.binder.headers=orderId

Community
  • 1
  • 1
Louis Alexander
  • 125
  • 2
  • 11

1 Answers1

5

What version of spring-cloud-stream are you using?

I just wrote a quick test case and it worked just fine...

spring.cloud.stream.kafka.binder.headers=bar
spring.cloud.stream.bindings.output.destination=foobar
spring.cloud.stream.bindings.input.destination=foobar
spring.cloud.stream.bindings.input.group=foo

App:

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@SpringBootApplication
@EnableBinding(Processor.class)
public class So38961697Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);
        Foo foo = context.getBean(Foo.class);
        foo.start();
        foo.send();
        Thread.sleep(30000);
        context.close();
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    private static class Foo {

        @Autowired
        Processor processor;

        public void send() {
            Message<?> m = MessageBuilder.withPayload("foo")
                    .setHeader("bar", "baz")
                    .build();
            processor.output().send(m);
        }

        public void start() {
            this.processor.input().subscribe(new MessageHandler() {

                @Override
                public void handleMessage(Message<?> m) throws MessagingException {
                    System.out.println(m);
                }

            });
        }

    }

}

Result:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]

The complete project is here.

Edit: See comment, upgrading to 1.0.2.RELEASE solved the issue

EDIT

Add a group to ensure the consumer consumes from the earliest message. See comment below.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I examined your dependencies and noticed you were using 1.0.2.RELEASE whereas I was using 1.0.0.RELEASE. Upgrading my project to use 1.0.2.RELEASE solved the issue. Next time, I will ensure that I am using the latest release. – Louis Alexander Aug 16 '16 at 15:06
  • Also note that your documentation link in the question points to the current snapshot, which might be ahead of the current release. The correct link to the current (1.0.2) release is [here](http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_binder_properties); it will always point to the latest released version of the docs. – Gary Russell Aug 16 '16 at 15:19
  • "Does not work" is not much help to anyone that might help you. It's better to open a new question, refer to this one, and provide MUCH more detail. That said, with stream 1.2.x, anonymous groups consume from the latest offset; there is a timing issue in the example in that the message is sent before the consumer starts listening, so he never sees the message. Add `spring.cloud.stream.bindings.input.group=foo` and it will work fine. If not; ask a new question. – Gary Russell Jun 15 '17 at 14:19
  • The updated project is in my [sandbox repo](https://github.com/garyrussell/sandbox/tree/master/so38961697). – Gary Russell Jun 15 '17 at 14:35