1

I got this error when i try to consume records from kinesis: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap') so it looks like the payload is coming in another type or so.

Note: if i replaced batch by record and replaced signature of consumer to receive one message only Consumer<byte[]> fizzBuzzConsumer() it works fine and i can convert the byte array to string with no issues, so why it's not working with batch mode?

spring:
  profiles:
    active: local
  application:
    name: my-consumer
  cloud:
    function:
      definition: fizzBuzzConsumer
    stream:
      function:
        bindings:
          fizzBuzzConsumer-in-0: input
      bindings:
        input:
          consumer:
            batch-mode: true
            use-native-decoding: true
          destination: kinesis-writer-stream
          content-type: text/plain
          group: kinesis-reader-app-group

      kinesis:
        bindings:
          kinesis-writer-stream:
            consumer:
              listener-mode: batch
              checkpoint-mode: periodic
              checkpoint-interval: 3000
              idle-between-polls: ${KINESIS_CONSUMER_IDLE_BETWEEN_POLLS:1000}
              consumer-backoff: ${KINESIS_CONSUMER_BACKOFF:1000}
              records-limit: ${KINESIS_CONSUMER_RECORDS_LIMIT:2000}
              shard-iterator-type: TRIM_HORIZON
              worker-id: kinesis-reader-worker-id

        binder:
          checkpoint:
            table: kinesis-reader-stream-metadata
          locks:
            table: kinesis-reader-lock-registry
            lease-duration: 30
            refresh-period: 3000
            read-capacity: 10
          kpl-kcl-enabled: true
          auto-create-stream: true
          auto-add-shards: true
          min-shard-count: 1

And here is my consumer:

@Bean
public Consumer<Message<List<byte[]>>> fizzBuzzConsumer() {
    return message -> {
        for (byte[] record: message.getPayload()) {
            String json = new String(Objects.requireNonNull(record), StandardCharsets.UTF_8);
            log.info("New Record comes.... {}", json);
        }
    };
}

Error:

2022-11-03 22:22:13.496 ERROR 549022 --- [cTaskExecutor-4] s.i.a.i.k.KclMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[1165], headers={aws_shard=shardId-000000000000, id=d1433639-dcf9-53eb-9980-cc893406c3e8, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945], aws_receivedPartitionKey=1082619945, aws_receivedStream=kinesis-writer-stream, aws_receivedSequenceNumber=49634843198456367976521810433671650607349108380513861634, timestamp=1667510514911}]'
for the 'UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945]'.
Consider to use 'errorChannel' flow for the compensation logic.

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4f4bbdbb]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418) ~[spring-integration-aws-2.5.1.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) ~[amazon-kinesis-client-1.14.8.jar:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
mibrahim.iti
  • 1,928
  • 5
  • 22
  • 50
  • `[B` is a byte array (`byte[]`), so the exception is telling you that you have a `byte[]` that you're trying to cast to a `List`. I'm not as familiar with this stack, but my guess would be that you actually have a `Message`, not a `Message>`. I'm not sure what the best/most idiomatic way to list-ify it is, but hopefully that should at least point you in the right direction. (More specially: `message.getPayload()` gets compiled down to `(List) message.getPayload()` after generics are applied, and I think that cast is what's causing you problems.) – yshavit Nov 04 '22 at 00:22
  • Yes, i know that, but i am asking about why the consumer is not receiving List in case of listener-mode equals to batch? it's working fine if i went with receiving one record with listener-mode equals to record. – mibrahim.iti Nov 04 '22 at 07:44
  • Try this `Consumer>`. That's what we claim in docs: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties. Not sure though why `Message>` doesn't work. Need to investigate... – Artem Bilan Nov 04 '22 at 14:44
  • This one `Consumer>>` gives me a list of `byte[]` – Artem Bilan Nov 04 '22 at 15:58
  • Unfortunately both ways Consumer> and Consumer>> doesn't work with me and it gives same error stack, i am using spring-cloud-stream-binder-kinesis version 2.2.0 and test that using localstack version 1.2.0 with aws local kinesis, so could you please share your configuration of that sample project you used in testing that behavior? – mibrahim.iti Nov 04 '22 at 21:12

1 Answers1

0

For me (using Spring boot 3.0.2) the error was, ironically, caused by how the message was sent to the RMQ. On the producer side, I was sending the messages through standard RabbitTemplate and not Spring's StreamBridge.

While I am not entirely familiar with Spring Cloud Steam's message decoding feature, RabbitTemplate in standard configuration is sending a base64 encoded Java object including its package name.

This also persists if RabbitTemplate is set to use JSON, i.e.

var eventPayload = new SomeEventDto(12332, "some string val", "another string val");
template.setMessageConverter(new Jackson2JsonMessageConverter());
    template.convertAndSend("rabbit-stream-exchange", "rabbit-stream-queue", eventPayload); 

Sending the message from the abovementioned code adds additional header to the rabbit message:

__TypeId__: com.example.example123.SomeEventDto

If the Spring Cloud Stream receiver receives the message with __TypeId__ header set it will then fail to cast the message into the correct object, seemingly even if the header is set to the exact type that it is supposed to deserialize into. At the same time, if the same payload is (manually) sent without this header, all other parameters unchanged, the message will be consumed successfully.

Thus, my fix was to send the message through the Spring Cloud Stream instead:

@Service
@RequiredArgsConstructor
public class EventPublisher {
  private final StreamBridge streamBridge;

  public void sendSomeEvent(SomeEventDto event) throws IOException {
    streamBridge.send("some-event-binding-out-0", MessageBuilder.withPayload(event).build());
  }
}

The message sent this way was serialized as JSON and sent without the `__TypeId__` header which was confusing the consumer so much. 
Simas Joneliunas
  • 2,890
  • 20
  • 28
  • 35