2

When I consume the message from kinesis stream. I get some junk chars with headers etc

    @StreamListener(Processor.INPUT)
    public void receive(String message) {       
        System.out.println("Message recieved: "+message);
        throw new RuntimeException("Exception thrown");
    }

    @StreamListener("errorChannel")
    public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {      

        //original paylaod 
        System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
        System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
    }

Aplication yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: abcd
          destination: stream
          content-type: application/json
          errorChannelEnabled: true
          consumer:
            headerMode: raw

I get output at the both the listener and errorChannel with junk characters

I am trying to extract the original message in errorChannel . Is this the right way to convert the bytes message?

Message recieved: ?contentType "application/json"{"aa":"cc"}
Patan
  • 17,073
  • 36
  • 124
  • 198
  • You need `headerMode: raw` on the producer side too; or `headerMode: embedded` on the consumer side so that the headers are stripped off. – Gary Russell Mar 14 '18 at 18:13
  • @GaryRussel Thank you. I think default header mode at consumer side is embedded. I tried with that as well. – Patan Mar 15 '18 at 02:05
  • @GaryRussell. In stream listener, I get correct message. But in ErrorChannel, I get the message with content type and special chars.I am doing like this new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8") Can you help. – Patan Mar 16 '18 at 07:24
  • In that case, you need to use `MessageHeaderUtils.extractHeaders()` on the `originalMessage`. – Gary Russell Mar 16 '18 at 13:54
  • See my answer. I wonder if we should do embedded headers extraction *before* sending downstream at all, on the Binder level. To achieve the way when `originalMessage` on the `errorChannel` is transformed already. – Artem Bilan Mar 20 '18 at 21:57

1 Answers1

3

The AWS Kinesis doesn't provide any headers entity. So, to leverage such a functionality in Spring Cloud Stream, we are embedding headers into the body of the Kinesis record. For this purpose the headerMode is embeddedHeaders by default in the Kinesis Binder. And for symmetry between producer and consumer this option must not be changed.

The Framework provides out-of-the-box EmbeddedHeadersChannelInterceptor for the target @StreamListener channels and embedded headers are extracted and populated properly to the message to send.

When we handle errors in the errorChannel, we indeed have an errorMessage.getOriginalMessage() as non-transformed - original. And therefore the payload of that message is a byte[] from the record body containing embedded headers.

If you would like to parse them properly. you should use utility:

EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • 1
    @artem-bilan Is there a way to add headers to body using spring integration? I mean if I manually add a header map in body of my event to kinesis, then using `EmbeddedHeaderUtils.extractHeaders` would still extract the headers? – sansari Jun 25 '18 at 14:13
  • Please, raise a new question with more details. SO is not a chat. – Artem Bilan Jun 25 '18 at 14:48