4

We are using spring-cloud-stream and planning to upgrade our Kafka version.
Our applications using spring-cloud-stream:2.0.0 (spring-kafka 2.1.7) with apache kafka server 1.0.1 and also using spring-cloud-sleuth:2.0.0 for tracking.
We are going to upgrade our Kafka server to version 2.3.0 so it requires an upgrade to spring-boot 2.2.x (Hoxton) with spring-cloud-sleuth:2.2.0 and spring-cloud-stream:3.0.3 (Horsham.SR3).
We have ~200 applications that using Kafka hence the upgrade will be gradually, so as intermediate state we are going to have producers on the newer version and consumers using old version.
Our consumers are using @StreamListener.

During our tests we encountered an issue with parsing most of the headers with type String and getting the following:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

While the Types header is:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

For instance the X-B3-SpanId that was added by Sleuth is of type String and the value is: ecb89ccb3e79418b which is not JSON string, therefore the ObjectMapper fails on conversion to String Object here:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

Looks like it should not use ObjectMapper when we have String types, hence our old consumers are failing.

Is there a way to prevent this issue when using new producer and old consumer?

Yuval Simhon
  • 1,439
  • 2
  • 19
  • 34

1 Answers1

1

You can configure the DefaultKafkaHeaderMapper to be compatible with older versions:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

Also see https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

spring.cloud.stream.kafka.binder.headerMapperBeanName

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hi @gary , this suggestion isn't working. We've tried to produce using **Hoxton.SR3** with `encodeStrings flag = true` being set on the `DefaultKafkaHeaderMapper` as suggested. With this setup the consumer is able to process all String headers but fails on content-type header. The type value of this header was changed to `org.springframework.util.MimeType` (due to the flag change) and the value is "application/json" hence it's trying to convert it to JSON and fails here: `headers.put(h.key(), getObjectMapper().readValue(h.value(), type));` with NPE. – Yuval Simhon Dec 20 '20 at 07:08
  • 1
    Try using `BinderHeaderMapper` with the property set instead of `DefaultKafkaHeaderMapper`; it was originally a clone but I see some additional code there to deal with `MimeType`. – Gary Russell Dec 21 '20 at 14:38
  • 1
    Thanks @gary - `BinderHeaderMapper` did it, no other issues related to headers starting from `spring-kafka:2.3.7` – Yuval Simhon Jan 04 '21 at 09:16