2

I'm experiencing some really bad throughput issues with spring-integration-aws's SqsMessageDrivenChannelAdapter, which is Spring Integration abstraction around spring-cloud-aws's SimpleMessageListenerContainer.

The issue, it seems, is that SimpleMessageListenerContainer can only request 10 messages at a time (an AWS limitation), and makes those requests incredibly slowly—in particular, I'm observing ~15 tps, which is way too slow.

Here's the code I have:

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class SqsConfiguration {
    // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestParameters
    private static final int SQS_RECEIVE_MAX_NUM_MESSAGES = 10;

    @Autowired
    private AmazonSQSAsync amazonSQSAsync;

    @Bean
    ThreadPoolTaskExecutor sqsThreadPoolTaskExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        return threadPoolTaskExecutor;
    }

    @Bean
    MessageChannel receivedSqsMessageChannel() {
        return new ExecutorChannel(sqsThreadPoolTaskExecutor());
    }

    @Bean
    SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter() {
        final SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "...");
        sqsMessageDrivenChannelAdapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
        sqsMessageDrivenChannelAdapter.setOutputChannel(receivedSqsMessageChannel());
        sqsMessageDrivenChannelAdapter.setMaxNumberOfMessages(SQS_RECEIVE_MAX_NUM_MESSAGES);
        return sqsMessageDrivenChannelAdapter;
    }
}

receivedSqsMessageChannel is the input to a flow which eventually ends in the message's Acknowledgement#acknowledge() being called.

Given there's not yet any feature of SimpleMessageListenerContainer (and therefore SqsMessageDrivenChannelAdapter) to poll on multiple threads (there are discussions for spring-cloud-aws, but it's far from close to being implemented, and after that, for spring-integration-aws to introduce support for it), and the TPS I have right now is unacceptable, I reason the best "fix" for now is to have multiple SqsMessageDrivenChannelAdapters polling concurrently and outputting to receivedSqsMessageChannel.

How can I achieve this in Spring? Is there a way I can have a pool of SqsMessageDrivenChannelAdapters running concurrently using Spring annotations?

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Your performance could be impacted by the fact the listener will wait for all message processing worker threads to finish before fetching more messages off the queue https://github.com/spring-cloud/spring-cloud-aws/issues/166 The recommendation is to ensure your handler code is async. This way the @SqsListener method will return immediately, and the countdown latch will get to 0 very fast. – Troup Dec 04 '20 at 10:34

0 Answers0