0

I am replacing a JMS processor with an SQS processor. When I receive the message I need to make update calls into multiple third party systems. I am not sure how to deal with the case successfully retrieving an SQS message, but having one or more of the calls to update the third party systems fails. In JMS world, we would throw an exception and it would re-send the same JMS message with an incremental back off before eventually going onto the DLQ after 4 retries. Here is the code I have so far.

I have the following helper method to consolidate the message retrieval

public static List<Message> receiveMessage(SqsClient sqs, String queueUrl) throws AwsException {
try {
    ReceiveMessageRequest req = ReceiveMessageRequest.builder()
                                                     .queueUrl(queueUrl)
                                                     .waitTimeSeconds(LONG_POLL_DURATION)
                                                     .build();
    ReceiveMessageResponse resp = sqs.receiveMessage(req);
    if (resp != null) {
      if (!Collections.isNullOrEmpty(resp.messages())) {
        return resp.messages();
      } else {
        return new ArrayList<>();
      }
    } else {
      return new ArrayList<>();
    }
  } catch (SdkException e) {
    throw new AwsException("An error occurred receiving SQS message: " + e.getMessage(), e);
  }
}

My Code for processing the SQS Messages

try (SqsClient client = SqsUtil.getClient()) {
  while(!shutdown) {
    List<Message> messages = SqsUtil.receiveMessage(client, queueUrl);
    if (!messages.isEmpty()) {
      for(Message msg : messages) {
        boolean errorOccurred = false;
        try {
          //Convert SQS message to System 1 Request
          System1Request req1 = convert(msg);
          system1Client.process(req1);
        } catch (System1Exception e) {
          //log error
          errorOccurred = true;
        }
        try {
          //Convert SQS message to System 2 Request
          System2Request req2 = convert(msg);
          system2Client.process(req2);
        } catch (System1Exception e) {
          //log error
          errorOccurred = true;
        }
        if (!errorOccurred) {
          //delete SQS message
        } else {
          //TODO: how do I re-process the message using SQS
        }
      }
    }
  }
}

From my understanding the SQS Client has retry capabilities built into the JDK, but I didn't think that applied to in-flight messages? I don't see in the API how it is possible to add the message back onto the SQS queue. I am hesitant to build the re-try logic into my application in case the pod restarts and the message is lost.

I understand that if I don't delete the message then it will eventually transfer to the DLQ, but I would like to attempt a few re-tries before sending the message to the DLQ. The SQS Example code doesn't show how to handle this and the AWS documentation seems very fragmented.

Do I need to build a second queuing mechanism to deal with this case? Am I misreading how the retry mechanisms work?

Mike
  • 820
  • 7
  • 19

1 Answers1

2

I think the key concept you're missing is the SQS visibility timeout

When a consumer receives and processes a message from a queue, the message remains in the queue. Amazon SQS doesn't automatically delete the message. Because Amazon SQS is a distributed system, there's no guarantee that the consumer actually receives the message (for example, due to a connectivity issue, or due to an issue in the consumer application). Thus, the consumer must delete the message from the queue after receiving and processing it.

The visibility timeout (configurable in settings) is there to prevent multiple consumers from attempting to process the same message at once (remember, it's not really removed from the queue yet), while providing you certainty that you'll not lose messages.

If a message is not deleted, it's visibility timeout will eventually expire, and SQS can return it again (redrive the message) in future receiveMessage calls. If this happens too many times (typically when there is an uncaught exception or something during message processing), then SQS will deliver the message to the SQS Dead-Letter-Queue (DLQ) (if you've configured it).

Remember - the key loop here is essentially:

while (keepProcessing()) { 
    Message m = receiveMessage();    // call SQS to get message
    processMessage(m);               // your own logic to process
    deleteMessage(m);                // call SQS to ACK the message
}

...and if an exception gets thrown somewhere in there, that message is not lost - it'll get redriven (based on DLQ policies & visibilityTimeout)

The AWS SDK built-in retry capabilities (different from redriving messages) are for calls made to the SDK (ie: receiveMessage, deleteMessage, etc). This is meant to automatically handle things like intermittent throttling, networking, or service issues.

Processing your message is your own logic, so it's your job if you want to manage any retries around that (and determining what type of issues might be solved with a simple retry).

Krease
  • 15,805
  • 8
  • 54
  • 86
  • This is the correct answer. I do not have the ability to configure the queues, but the issue was that the Maximum Recieves policy was set to 1. Since the message was immediately being send to the DLQ after the default 30 second message visiblity it was never attempting to retry the message. Thank you for the assistance. – Mike Feb 15 '19 at 19:29