1

The code below can also be found in the answer to How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels? When attempting to switch from RabbitMQ to Kafka, I'm encountering the following exception:

org.springframework.messaging.MessageHandlingException: Missing header 'foo' for method parameter type [class java.lang.String]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) ~[spring-integration-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$200(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:79) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

In the following code, the only change I make is in the POM file. I change spring-cloud-starter-stream-rabbit to spring-cloud-starter-stream-kafka. When running with rabbit, the solution works fine. When I switch to kafka is when the exception is encountered. Why is the foo header not available? I'm seeing similar problems in client code I'm working on (which I can't post here) where custom headers are not sent with the Message when trying to use Kafka (RabbitMQ works just fine). Any ideas why the header isn't being sent?

Gateway class

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  interface GatewayChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();

    String TO_LOWERCASE_REPLY = "to-lowercase-reply";
    String TO_LOWERCASE_REQUEST = "to-lowercase-request";

    @Input(TO_LOWERCASE_REPLY)
    SubscribableChannel toLowercaseReply();

    @Output(TO_LOWERCASE_REQUEST)
    SubscribableChannel toLowercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {

    @Gateway(requestChannel = ENRICH_TO_UPPERCASE_REQUEST,
        replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String toUppercase(@Payload String payload, @Header(name = "foo") String foo,
        @Header(name = "bar") String bar);

    @Gateway(requestChannel = ENRICH_TO_LOWERCASE_REQUEST,
        replyChannel = GatewayChannels.TO_LOWERCASE_REPLY)
    String toLowercase(@Payload String payload, @Header(name = "foo") String foo,
        @Header(name = "bar") String bar);
  }

  private static final String ENRICH_TO_UPPERCASE_REQUEST = "enrich-to-uppercase-request";

  private static final String ENRICH_TO_LOWERCASE_REQUEST = "enrich-to-lowercase-request";

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow toUppercaseEnricherFlow() {
    return IntegrationFlows.from(ENRICH_TO_UPPERCASE_REQUEST)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

  @Bean
  public IntegrationFlow toLowercaseEnricherFlow() {
    return IntegrationFlows.from(ENRICH_TO_LOWERCASE_REQUEST)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_LOWERCASE_REQUEST).get();
  }


  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/to_upper/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> toUpper(@PathVariable("string") String string,
        @RequestHeader("foo") String foo, @RequestHeader("bar") String bar) {
      return new ResponseEntity<String>(gateway.toUppercase(string, foo, bar), HttpStatus.OK);
    }

    @GetMapping(value = "/to_lower/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> toLower(@PathVariable("string") String string,
        @RequestHeader("foo") String foo, @RequestHeader("bar") String bar) {
      return new ResponseEntity<String>(gateway.toLowercase(string, foo, bar), HttpStatus.OK);
    }
  }

}

Gateway application.yml

spring:
  cloud:
    stream:
      default:
        consumer:
          headerMode: headers
        producer:
          headerMode: headers
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
        to-lowercase-request:
          destination: to-lowercase-request
          producer:
            required-groups: stream-to-lowercase-request
        to-lowercase-reply:
          destination: to-lowercase-reply
          group: gateway-to-lowercase-reply
      kafka:
        binder:
          headers: replyChannel,foo,bar,spanId,spanTraceId,spanSampled,spanProcessId,spanParentSpanId,spanName,spanFlags,messageSent
server:
  port: 8090

Gateway pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>gateway</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>gateway</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Edgware.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka11</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>
<!--        <dependency> -->
<!--            <groupId>org.springframework.cloud</groupId> -->
<!--            <artifactId>spring-cloud-sleuth-stream</artifactId> -->
<!--        </dependency> -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

CloudStream class

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();

    String TO_LOWERCASE_REPLY = "to-lowercase-reply";
    String TO_LOWERCASE_REQUEST = "to-lowercase-request";

    @Output(TO_LOWERCASE_REPLY)
    SubscribableChannel toLowercaseReply();

    @Input(TO_LOWERCASE_REQUEST)
    MessageChannel toLowercaseRequest();
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> processToUppercase(Message<String> request, @Header("foo") String foo,
      @Header("bar") String bar) {
    System.out.println("foo header received: " + foo);
    System.out.println("bar header received: " + bar);
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  }

  @StreamListener(CloudStreamChannels.TO_LOWERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_LOWERCASE_REPLY)
  public Message<?> processToLowercase(Message<String> request, @Header("foo") String foo,
      @Header("bar") String bar) {
    System.out.println("foo header received: " + foo);
    System.out.println("bar header received: " + bar);
    return MessageBuilder.withPayload(request.getPayload().toLowerCase())
        .copyHeaders(request.getHeaders()).build();
  }

}

CloudStream application.yml

spring:
  cloud:
    stream:
      default:
        consumer:
          headerMode: headers
        producer:
          headerMode: headers
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
        to-lowercase-request:
          destination: to-lowercase-request
          group: stream-to-lowercase-request
        to-lowercase-reply:
          destination: to-lowercase-reply
          producer:
            required-groups: gateway-to-lowercase-reply
      kafka:
        binder:
          headers: replyChannel,foo,bar,spanId,spanTraceId,spanSampled,spanProcessId,spanParentSpanId,spanName,spanFlags,messageSent
server:
  port: 8091

CloudStream pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>cloudStream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>cloudStream</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Edgware.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka11</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>
<!--        <dependency> -->
<!--            <groupId>org.springframework.cloud</groupId> -->
<!--            <artifactId>spring-cloud-sleuth-stream</artifactId> -->
<!--        </dependency> -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

Test by sending a GET request to http://localhost:8080/string/stuff and be sure to set the foo header to a value like bar.

Update after Artem's and Gary's help

I have updated the example code so that the example works with both RabbitMQ and Kafka.

Keith Bennett
  • 733
  • 11
  • 25

1 Answers1

0

You need to configure a replyChannel header to be transferred:

spring.cloud.stream.kafka.binder.headers

The list of custom headers that will be transported by the binder.

Default: empty.

As you see by default non of custom headers are transferred over Kafka:

https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/htmlsingle/#_configuration_options_2

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I updated my original post to be more specific to the problem I'm encountering. I figured out right after I posted that I needed to add the `spring.cloud.stream.kafka.binder.headers`, but the problem I'm having deals with the custom `foo` header in my updated example code. The custom header is sent just fine with RabbitMQ but isn't with Kafka. If I don't send the custom `foo` header in the @MessagingGateway, both RabbitMQ and Kafka work fine. – Keith Bennett Feb 05 '18 at 20:57
  • 1
    But you have not added `foo` to your property (it's not clear why you are passing `output-channel` and `replyChannel`. If you are using Kafka 0.11 or later, I suggest you use the kafka11 binder because it supports native kafka headers. See the footnotes at the bottom of the [project paga](https://cloud.spring.io/spring-cloud-stream/) for how to configure the kafka11 binder and override other dependencies. – Gary Russell Feb 05 '18 at 21:11
  • 1
    Not sure how it is related. You should ensure that the `headerMode` is `headerMode` and that you really transfer `replyChannel` and tha `foo` header via `binder.headers` property. Also I would say that your are doing so unusual solution for the Spring Cloud Stream, you should start to study how to debug the code and not only yours, but also from the Framework to be sure where and how you lose those headers. Also would be better to start to develop against SCSt `2.0` - since Apache Kafka `1.0.0` the `headers` is a built-in feature and we should not have this problem anymore! – Artem Bilan Feb 05 '18 at 21:13
  • Artem, I am using Spring Cloud Edgware.SR1 which brings in Spring Cloud Stream Ditmars.SR3. When you mention SCSt 2.0, are you referring to Spring Cloud Stream 2.0? – Keith Bennett Feb 05 '18 at 21:39
  • Correct. That's it. But see Gary's comment - he has a good point about your missed `foo` header in the config and about `kafka11` Binder implementation. – Artem Bilan Feb 05 '18 at 21:44
  • Gary, you were correct. I was missing the `foo` property. After adding this, the example works. I am running Kafka 1.0.0, and unfortunately, I don't have access to the spring-milestones repository from my client's repo proxy, so I can't upgrade beyond what's released to Maven central. Thanks for the help. – Keith Bennett Feb 05 '18 at 22:25
  • Gary, I'm not sure why `replyChannel` has to be added (`output-channel` doesn't, I verified), but after adding it and the `foo` property, the example works. – Keith Bennett Feb 05 '18 at 22:29
  • The spring-cloud-stream-binder-kafka11 (1.3.0.RELEASE) is in maven central and supports native headers; as I said there are instructions on the project page about what else has to be overridden. – Gary Russell Feb 06 '18 at 13:23
  • A quick update on this. When I add the `spring-cloud-sleuth-stream` dependency to the pom.xml, the same error occurs that I'm seeing in the code I'm working on for my client (i.e., missing headers). I tried experimenting with removing what I thought might be related dependencies in the project I'm working on and found that the custom headers were finally sent after removing this dependency. – Keith Bennett Feb 07 '18 at 17:10
  • Also, I have the `spring-cloud-starter-stream-kafka11` dependency specified now, but I still have to specify the headers I want to send in my yml files. Unless I understood you incorrectly, I thought after upgrading to version 1.3.0 of this dependency I wouldn't have to do that as they would natively be sent. – Keith Bennett Feb 07 '18 at 17:13
  • You don't understand that situation a bit. Everything is sent only the problem that you don't propagate some required headers. And that's where you fail, not because you send some `foo` header or not. You also have to include those who are required by Sleuth – Artem Bilan Feb 07 '18 at 17:14
  • In that case you should consider `headerMode=raw` or `headers`. By default it is `embeddedHeaders`. And that requires from you an appropriate cfonfiguration – Artem Bilan Feb 07 '18 at 17:16
  • Artem, I must not be understanding something you are stating. I have updated the example code with the latest I've tested based on your most recent comments. The posted code works, but as soon as I uncomment the `spring-cloud-sleuth-stream` dependency, it breaks. I added the Sleuth headers to the configuration, and I've tested with both `headerMode` set to `raw` and `headers`. What am I missing? I am not understanding why simply adding the `spring-cloud-sleuth-stream` dependency makes this example break. – Keith Bennett Feb 07 '18 at 18:18
  • I think that is the situation when we need to start a new thread. So, far I see it works. The problem with the Sleuth should be consider from the fresh page. Not sure what you mean doesn't work, but without stack trace we can't proceed. Please, start a new SO thread and we'll see. – Artem Bilan Feb 07 '18 at 18:24
  • Continued at https://stackoverflow.com/questions/48671269/migrating-from-rabbitmq-to-kafka-and-encountering-possible-issue-with-spring-clo. – Keith Bennett Feb 07 '18 at 18:54