1

I have a simple spring integration flow receiving a message from an AmqpInboundChannelAdapter. After some transformations, the messages must be put into an AWS Kinesis data stream.

@Bean
public AmqpInboundChannelAdapter inboundChannelAdapter(
ConnectionFactory connectionFactory,
@Qualifier(INPUT_CHANNEL_NAME) MessageChannel inputChannel) {

        return Amqp.inboundAdapter(connectionFactory,"amqpInputQueue")
                .configureContainer(c -> {
                    c.acknowledgeMode(AcknowledgeMode.MANUAL);
                    c.concurrentConsumers(1);
                    c.prefetchCount(250);
                    c.defaultRequeueRejected(false);
                })
                .outputChannel(inputChannel).get();
    }

@Bean
public IntegrationFlow myFlow(
        @Qualifier(INPUT_CHANNEL_NAME) MessageChannel input,
        @Qualifier(OUTBOUND_HANDLER) MessageHandler outboundHandler) {

    return IntegrationFlows
            .from(input)
            .transform(....)
            .enrichHeaders(h -> h.headerFunction(PARTITION_KEY_HEADER, message -> getPartitionKey(message)))
            .log(Level.INFO, m -> format("START PROCESSING MESSAGE (partitionKey: %s)", m.getHeaders().get(PARTITION_KEY_HEADER)))
            .transform(....)
            .handle(outboundHandler)
            .get();
}

The received messages, in the payload, contain a dynamic partition key (I don't know all the possible keys).

How can I use the partition key to process the messages in parallel, per partition key? Moreover, I must maintain the order of the messages per partition key: the first message received (per partition key) must be sent first to kinesis.

I've read about the ExecutorChannel, but I haven't understood how to direct an input message to a specific thread used only for that partition key.

roby.s
  • 21
  • 6

1 Answers1

0

There is no way to choose a specific thread from a pool, plus there is no guarantee that you would be able to have as many threads as you are going to have partition key.

However we can come up with a solution like this:

  1. You declare an ExecutorChannel with a single thread Executor - Executors.newSingleThreadExecutor().

  2. You declare as many beans of this ExecutorChannel as you think would enough for your concurrency support.

  3. All of them may be marked with the @BridgeTo to send to the common processing endpoint.

  4. Then you use a Router to determine an ExecutorChannel bean name by your partition key.

This way all the messages with the same partition key are going to be handled by the same thread in the respective ExecutorChannel and all downstream operations for that message are going to happen exactly on this thread.

There is no way to do that with Java DSL, but that even must not be: a router is a terminal endpoint in one IntegrationFlow, all ExecutorChannel instances @BridgeTo to input channel of another IntegrationFlow.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118