I have a simple spring integration flow receiving a message from an AmqpInboundChannelAdapter
. After some transformations, the messages must be put into an AWS
Kinesis data stream.
@Bean
public AmqpInboundChannelAdapter inboundChannelAdapter(
ConnectionFactory connectionFactory,
@Qualifier(INPUT_CHANNEL_NAME) MessageChannel inputChannel) {
return Amqp.inboundAdapter(connectionFactory,"amqpInputQueue")
.configureContainer(c -> {
c.acknowledgeMode(AcknowledgeMode.MANUAL);
c.concurrentConsumers(1);
c.prefetchCount(250);
c.defaultRequeueRejected(false);
})
.outputChannel(inputChannel).get();
}
@Bean
public IntegrationFlow myFlow(
@Qualifier(INPUT_CHANNEL_NAME) MessageChannel input,
@Qualifier(OUTBOUND_HANDLER) MessageHandler outboundHandler) {
return IntegrationFlows
.from(input)
.transform(....)
.enrichHeaders(h -> h.headerFunction(PARTITION_KEY_HEADER, message -> getPartitionKey(message)))
.log(Level.INFO, m -> format("START PROCESSING MESSAGE (partitionKey: %s)", m.getHeaders().get(PARTITION_KEY_HEADER)))
.transform(....)
.handle(outboundHandler)
.get();
}
The received messages, in the payload, contain a dynamic partition key (I don't know all the possible keys).
How can I use the partition key to process the messages in parallel, per partition key? Moreover, I must maintain the order of the messages per partition key: the first message received (per partition key) must be sent first to kinesis.
I've read about the ExecutorChannel, but I haven't understood how to direct an input message to a specific thread used only for that partition key.