I have a Spring Boot Application and want to receive Messages from multiple AWS SQS queues. These queues all have their own credentials (and there is sadly nothing I can do about that). None of these credentials can access one of the other queues, they are all restricted to exactly one queue.
With only one queue and credential, it is simple. I just have to provide the credentials as AWSCredentialsProvider
Bean and annotate my Method with @SqsListener
\ @EnableSqs
.
But I can not figure out how to do it with multiple credentials.
The @SqsListener
annotation has no way to provide credentials, or a preconfigured AmazonSqs
object, or anything else that would help.
I searched for a way to map a queue to credentials, by extending the CredentialsProvider
or the AmazonSqs
client, but to no avail.
I even tried to inject the credendials in the header of the AmazonHttpClient but that was also not possible.
I tried to create everything needed to listen to an SQS queue manually.
But I'm stuck at creating a MessageHandler for the SimpleMessageListenerContainer
.
The required QueueMessageHandler
only works when created as bean, with an application context.
Otherwise it won't look up methods annotated with @SqsListener
.
Sadly, the only tutorials or examples I could find either use JMS, which I would like to avoid, or just use the @SqsListener
annotation with only one queue.
Is there any other way to provide different credentials for multiple queues?
My test code:
@Component
@Slf4j
public class TestOneQueueA {
public static final String QUEUE_A = "TestOneQueueA";
public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}
@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}
Edit:
After trying some more, I was able to inject my AmazonSQS clients in two separate SimpleMessageListenerContainer
. The problem then became the QueueMessageHandler
.
If I create it manually, without bean context, it simply won't look for any methods with the @SqsListener
annotation. And there is no way to set the handlers manually.
If I create it as bean, it will look at every bean for the annotation. So it will also find the method of the queue it is not supposed to look for. And then it will crash because the credentials don't work.
I can not figure out a way to create a QueueMessageHandler
for only a single SqsListener method.
And SimpleMessageListenerContainer
won't take anything except a QueueMessageHandler
.