3

need help

I need to create multiple sqs queue consumers that execute in parallel, but i don't know how to achieve this using Sprint Integration

I have the following architecture

An Amazon SQS queue with 200k messages

A Amazon stack with 5 EC2 instances, every instance with tomcat server running a Spring boot application with a Spring Integration flow that consume the messages of SQS using sqs-message-driven-channel-adapter from spring-integration-aws (https://github.com/spring-projects/spring-integration-aws)

and publish that messages to a REST service that have and average response of 1 second (i can not modified the REST service is a constraint but i can send messages in parallel)

SQS queue -> Stack(5 tomcat instances) -> Rest Service

Constraints Amazon SQS allows client to read messages in batch with a maximum number of 10 messages by request but i can have multiple clients to consume more messages in parallel.

In Amazon SQS the message need to be deleted manually this is done using spring integration, i delete the message only if the REST service return OK.

I don't have problem with possible duplicates (SQS send the same message to two different client)

I can not store in any way messages in my Spring Boot application

My Spring Integration flow

<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
  sqs="clientWithCredentials" 
  channel="channel_1"
  queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
  max-number-of-messages="10"/>

<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />

How i can execute this flow in parallel in multiple threads to consume more messages in parallel?

I try to put <int:poller fixed-rate="1" task-executor="executor" /> inside sqs-message-driven-channel-adapter but is not allowed.

1 Answers1

2

To achiever such a requirements you can use an ExecutorChannel instead of default DirectChannel.

This way all the SQS messages are going to be distributed to the the thread supplied by the ExecutorChannel and, therefore, performed in parallel.

More info about an ExecutorChannel is in the Reference Manual.

UPDATE

So, what I suggest should be reflected in your current config like:

<int:channel id="channel_1">
   <int:dispatcher task-executor="someExecutor"/>
</int:channel>

UPDATE

If you still insist to have several SQS Adapters, then simplified version is like this:

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />


<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int:channel id="sqs-to-metricator" />

<int:outbound-channel-adapter ref="restService"
    method="publish" channel="sqs-to-metricator" />

Also to avoid duplication you can consider to switch to the Java DSL and start to use its ItengrationFlowContext for dynamic IntegrationFlow registrations: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I need a pool of sqs-message-driven-channel-adapter to request more messages from SQS. and every one with your own thread – Alexander Pinzon Fernandez Apr 12 '18 at 20:29
  • I don't understand "why?" a single `sqs-message-driven-channel-adapter` with an `ExecutorChannel` as an output should be fully enough. – Artem Bilan Apr 12 '18 at 20:48
  • sqs-message-driven-channel-adapter only can read 10 messages from Amazon queue (constraint from amazon), this flow going to read messages from sqs gain when the 10 messagges were stored in restService maybe after a couple of seconds. I want to read more that 10 messages at nearly same time using multiple sqs-message-driven-channel-adapter – Alexander Pinzon Fernandez Apr 12 '18 at 20:57
  • Hm. Still not clear why `ExecutorChannel` doesn't help... Can you explain this part, please ? – Artem Bilan Apr 12 '18 at 21:08
  • See some UPDATEs in my answer. – Artem Bilan Apr 12 '18 at 21:13
  • I don't understand how an Executor Channel would make any difference. The org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer class has an inner class called AsynchronousMessageListener. This is what appears to do the heavy lifting in getting messages. It latches on the message count (max of 10) and requires the MessageExecutor to have handled the message somehow. So, it can't possibly go and get another batch of messages no matter how the Channel is defined. Can you explain why it would please? – Steve Sep 14 '18 at 09:45
  • Huh? Directly proportional. The `SqsMessageDrivenChannelAdapter` just sends messages to its output channel in the internal `QueueMessageHandler` implementation. So, if we just shift messages there into another thread via the mentioned `ExecutorChannel`, we just set free the current thread from the `SimpleMessageListenerContainer` to let it to call the mentioned `latch`. Please, read source code starting from the `SqsMessageDrivenChannelAdapter` and how it relies on the `SimpleMessageListenerContainer` and what it does via its `sendMessage()`. – Artem Bilan Sep 14 '18 at 13:11
  • I tried, but it doesn't work. I get 10 messages and that's it until they finish the flow (there's a delay as I aggregate the messages based on content using rules that AWS can't handle). Also, I can't see the sendMessage call in that class. What version are you using? I have 2.0.0-release. – Steve Sep 14 '18 at 15:26
  • Sounds like you need to raise a new SO question with more details. – Artem Bilan Sep 14 '18 at 15:47
  • Yeah, I might do that... but thanks for your comments so far, this answer is the closest I've found to something that might help. – Steve Sep 14 '18 at 15:51
  • 1
    I raised if here if you're curious (and if you have further insight, even better). https://stackoverflow.com/questions/52336006/how-to-process-more-than-10-concurrent-messages-from-an-aws-sqs-fifo-queue-using – Steve Sep 14 '18 at 16:52