I'm experiencing some really bad throughput issues with spring-integration-aws
's SqsMessageDrivenChannelAdapter, which is Spring Integration abstraction around spring-cloud-aws
's SimpleMessageListenerContainer
.
The issue, it seems, is that SimpleMessageListenerContainer
can only request 10 messages at a time (an AWS limitation), and makes those requests incredibly slowly—in particular, I'm observing ~15 tps, which is way too slow.
Here's the code I have:
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class SqsConfiguration {
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestParameters
private static final int SQS_RECEIVE_MAX_NUM_MESSAGES = 10;
@Autowired
private AmazonSQSAsync amazonSQSAsync;
@Bean
ThreadPoolTaskExecutor sqsThreadPoolTaskExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
return threadPoolTaskExecutor;
}
@Bean
MessageChannel receivedSqsMessageChannel() {
return new ExecutorChannel(sqsThreadPoolTaskExecutor());
}
@Bean
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter() {
final SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "...");
sqsMessageDrivenChannelAdapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
sqsMessageDrivenChannelAdapter.setOutputChannel(receivedSqsMessageChannel());
sqsMessageDrivenChannelAdapter.setMaxNumberOfMessages(SQS_RECEIVE_MAX_NUM_MESSAGES);
return sqsMessageDrivenChannelAdapter;
}
}
receivedSqsMessageChannel
is the input to a flow which eventually ends in the message's Acknowledgement#acknowledge()
being called.
Given there's not yet any feature of SimpleMessageListenerContainer
(and therefore SqsMessageDrivenChannelAdapter
) to poll on multiple threads (there are discussions for spring-cloud-aws
, but it's far from close to being implemented, and after that, for spring-integration-aws
to introduce support for it), and the TPS I have right now is unacceptable, I reason the best "fix" for now is to have multiple SqsMessageDrivenChannelAdapter
s polling concurrently and outputting to receivedSqsMessageChannel
.
How can I achieve this in Spring? Is there a way I can have a pool of SqsMessageDrivenChannelAdapter
s running concurrently using Spring annotations?