I am replacing a JMS processor with an SQS processor. When I receive the message I need to make update calls into multiple third party systems. I am not sure how to deal with the case successfully retrieving an SQS message, but having one or more of the calls to update the third party systems fails. In JMS world, we would throw an exception and it would re-send the same JMS message with an incremental back off before eventually going onto the DLQ after 4 retries. Here is the code I have so far.
I have the following helper method to consolidate the message retrieval
public static List<Message> receiveMessage(SqsClient sqs, String queueUrl) throws AwsException {
try {
ReceiveMessageRequest req = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(LONG_POLL_DURATION)
.build();
ReceiveMessageResponse resp = sqs.receiveMessage(req);
if (resp != null) {
if (!Collections.isNullOrEmpty(resp.messages())) {
return resp.messages();
} else {
return new ArrayList<>();
}
} else {
return new ArrayList<>();
}
} catch (SdkException e) {
throw new AwsException("An error occurred receiving SQS message: " + e.getMessage(), e);
}
}
My Code for processing the SQS Messages
try (SqsClient client = SqsUtil.getClient()) {
while(!shutdown) {
List<Message> messages = SqsUtil.receiveMessage(client, queueUrl);
if (!messages.isEmpty()) {
for(Message msg : messages) {
boolean errorOccurred = false;
try {
//Convert SQS message to System 1 Request
System1Request req1 = convert(msg);
system1Client.process(req1);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
try {
//Convert SQS message to System 2 Request
System2Request req2 = convert(msg);
system2Client.process(req2);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
if (!errorOccurred) {
//delete SQS message
} else {
//TODO: how do I re-process the message using SQS
}
}
}
}
}
From my understanding the SQS Client has retry capabilities built into the JDK, but I didn't think that applied to in-flight messages? I don't see in the API how it is possible to add the message back onto the SQS queue. I am hesitant to build the re-try logic into my application in case the pod restarts and the message is lost.
I understand that if I don't delete the message then it will eventually transfer to the DLQ, but I would like to attempt a few re-tries before sending the message to the DLQ. The SQS Example code doesn't show how to handle this and the AWS documentation seems very fragmented.
Do I need to build a second queuing mechanism to deal with this case? Am I misreading how the retry mechanisms work?