I got this error when i try to consume records from kinesis: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap') so it looks like the payload is coming in another type or so.
Note: if i replaced batch by record and replaced signature of consumer to receive one message only Consumer<byte[]> fizzBuzzConsumer() it works fine and i can convert the byte array to string with no issues, so why it's not working with batch mode?
spring:
profiles:
active: local
application:
name: my-consumer
cloud:
function:
definition: fizzBuzzConsumer
stream:
function:
bindings:
fizzBuzzConsumer-in-0: input
bindings:
input:
consumer:
batch-mode: true
use-native-decoding: true
destination: kinesis-writer-stream
content-type: text/plain
group: kinesis-reader-app-group
kinesis:
bindings:
kinesis-writer-stream:
consumer:
listener-mode: batch
checkpoint-mode: periodic
checkpoint-interval: 3000
idle-between-polls: ${KINESIS_CONSUMER_IDLE_BETWEEN_POLLS:1000}
consumer-backoff: ${KINESIS_CONSUMER_BACKOFF:1000}
records-limit: ${KINESIS_CONSUMER_RECORDS_LIMIT:2000}
shard-iterator-type: TRIM_HORIZON
worker-id: kinesis-reader-worker-id
binder:
checkpoint:
table: kinesis-reader-stream-metadata
locks:
table: kinesis-reader-lock-registry
lease-duration: 30
refresh-period: 3000
read-capacity: 10
kpl-kcl-enabled: true
auto-create-stream: true
auto-add-shards: true
min-shard-count: 1
And here is my consumer:
@Bean
public Consumer<Message<List<byte[]>>> fizzBuzzConsumer() {
return message -> {
for (byte[] record: message.getPayload()) {
String json = new String(Objects.requireNonNull(record), StandardCharsets.UTF_8);
log.info("New Record comes.... {}", json);
}
};
}
Error:
2022-11-03 22:22:13.496 ERROR 549022 --- [cTaskExecutor-4] s.i.a.i.k.KclMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[1165], headers={aws_shard=shardId-000000000000, id=d1433639-dcf9-53eb-9980-cc893406c3e8, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945], aws_receivedPartitionKey=1082619945, aws_receivedStream=kinesis-writer-stream, aws_receivedSequenceNumber=49634843198456367976521810433671650607349108380513861634, timestamp=1667510514911}]'
for the 'UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945]'.
Consider to use 'errorChannel' flow for the compensation logic.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4f4bbdbb]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418) ~[spring-integration-aws-2.5.1.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) ~[amazon-kinesis-client-1.14.8.jar:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
>`. I'm not sure what the best/most idiomatic way to list-ify it is, but hopefully that should at least point you in the right direction. (More specially: `message.getPayload()` gets compiled down to `(List) message.getPayload()` after generics are applied, and I think that cast is what's causing you problems.)
>`. That's what we claim in docs: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties. Not sure though why `Message
– Artem Bilan Nov 04 '22 at 14:44>` doesn't work. Need to investigate...
> and Consumer>> doesn't work with me and it gives same error stack, i am using spring-cloud-stream-binder-kinesis version 2.2.0 and test that using localstack version 1.2.0 with aws local kinesis, so could you please share your configuration of that sample project you used in testing that behavior?
– mibrahim.iti Nov 04 '22 at 21:12