1

I am using spring-cloud-stream kafka binder to consume messages from a kafka topic. The source system is sending the json message in ascii. When My consumer listens to the topic it throws

o.s.c.s.b.k.KafkaMessageChannelBinder    : Could not convert message: 7B22736..

Is there any property that I can set in my .yml file to deserialize it? or is there an example that I can look into?

sw-enthusiast
  • 199
  • 1
  • 2
  • 17
  • @ka2 I saw your comment in [link](http://stackoverflow.com/questions/38548384/kafka-source-in-spring-cloud-data-flow) did you ever found the solution for your problem? – sw-enthusiast Mar 07 '17 at 17:02

3 Answers3

0

I am not sure what you mean by json in hexadecimal-binary data if you mean it's ascii data in a byte[], try adding spring.cloud.stream.bindings.input.content-type=text/plain (or application/json).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Edited my question, The error is thrown when I am using content-type:application/json. – sw-enthusiast Mar 07 '17 at 19:01
  • Did you try text/plain? Or you can set the `spring.kafka.consumer.valueDerserializer` to `org.apache.kafka.common.serialization.StringDeserializer`. – Gary Russell Mar 07 '17 at 19:10
  • When I used text/plain it's able to convert to the json however in the console it still throws the same error. I cannot find the spring.kafka.consumer.valueDerserializer property in the spring cloud stream kafka binder that I am using. – sw-enthusiast Mar 07 '17 at 21:27
  • Oh, right, sorry; that was just added yesterday (Boot 1.5 support) [commit here](https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/6a31e9c94f604f7054dd4f50f905da4d8edd5c78). I am not sure how to override the key/value deserializers with earlier binders. – Gary Russell Mar 07 '17 at 21:41
0

You can look for the configuration property here: http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties

In your case, you can set this by doing the following:

spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.value.deserializer=<Deserialzier class>

Kafka binder takes all properties from the configuration map. Thus you can use any generic Kafka consumer properties and pass them this way.

sobychacko
  • 5,099
  • 15
  • 26
0

When I added content-type:plain/text and spring.cloud.stream.bindings.<subscriptionChannel>.consumer.‌​headerMode:raw it worked.

Thank you!

sw-enthusiast
  • 199
  • 1
  • 2
  • 17