139

What is the best practice to move messages from a dead letter queue back to the original queue in Amazon SQS?

Would it be

  1. Get message from DLQ
  2. Write message to queue
  3. Delete message from DLQ

Or is there a simpler way?

Also, will AWS eventually have a tool in the console to move messages off the DLQ?

Matt Dell
  • 9,205
  • 11
  • 41
  • 58

14 Answers14

163

Here is a quick hack. This is definitely not the best or recommended option.

  1. Set the main SQS queue as the DLQ for the actual DLQ with Maximum Receives as 1.
  2. View the content in DLQ (This will move the messages to the main queue as this is the DLQ for the actual DLQ)
  3. Remove the setting so that the main queue is no more the DLQ of the actual DLQ
Rajkumar
  • 1,655
  • 2
  • 11
  • 3
  • 18
    Yeah, this is very much a hack - but a nice option for a quick fix if you know what you are doing and don't have time to solve this the proper way #yolo – Thomas Watson Jul 24 '15 at 10:00
  • 15
    But the receive count is not reset to 0 when you do this. Be Careful. – Rajdeep Siddhapura Feb 16 '17 at 08:35
  • 2
    The right approach is to configure the Redrive Policy in SQS with max receive count and it will automatically move the message to DLQ when it will cross the set receive count, then write a reader thread to read from DLQ. – Ash May 15 '18 at 06:27
  • @RajdeepSiddhapura can you explain what you mean by reset count not resetting to 0 means? – Snowman May 26 '18 at 23:01
  • 5
    You're a genius. – JefClaes Jun 08 '18 at 14:22
  • 3
    I created a CLI tool for this problem a few months back: https://github.com/renanvieira/phoenix-letter – MaltMaster Dec 02 '19 at 10:17
  • It sounds like the wrong thing to do, but it is very correct! Thanks – billias Jan 22 '20 at 18:16
  • 1
    This is great hack, but the problem is as of now, when you have deployed infrastructure as code CFN the it will raise in failure with circular resource dependency error and not able to deploy. Though at the time of writing this post it is possible using AWS Console and CLI. – Anish Barnwal May 11 '20 at 21:18
  • Personally, I don't like the idea of reconfiguring my queue set up to achieve this and the answer below which uses a simple CLI tool to move the messages felt a lot safer. If you have your AWS creds set up and Go or Node ready to use on your development machine I would suggest going with those, but understand not everyone will. – George Thomas Jan 23 '21 at 12:35
  • 1
    what does `View the content in DLQ` mean? at the web interface, just polling for messages will sufice? – rado May 18 '21 at 15:08
  • Thank you for the solution. I cannot imagine that AWS is not able to implement such a simple solution properly. They introduce DLQ Redrive, but it doesn't support FIFO queues yet – Novdar Jul 26 '23 at 14:58
53

On Dec 1 2021 AWS released the ability to redrive messages from a DLQ back to the source queue(or custom queue).

With dead-letter queue redrive to source queue, you can simplify and enhance your error-handling workflows for standard queues.

DLQ redrive

Source:

Introducing Amazon Simple Queue Service dead-letter queue redrive to source queues

BinaryButterfly
  • 18,137
  • 13
  • 50
  • 91
FrostyOnion
  • 856
  • 7
  • 10
44

There are a few scripts out there that do this for you:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 
Ulad Kasach
  • 11,558
  • 11
  • 61
  • 87
  • 3
    This is the simplest way, unlike the accepted answer. Just run this from the terminal which have AWS env vars property set: `npx replay-aws-dlq DL_URI MAIN_URI` – Vasyl Boroviak Apr 01 '20 at 01:54
  • Note typo: dql -> dlq # install npm install replay-aws-dlq; – Lee Oades Jul 10 '20 at 12:33
  • This worked flawlessly for me (note, I only tried the go based one). Seemed to move the messages in stages and not all at once (a good thing) and even had a progress bar. Better than the accepted answer IMO. – Eugene Ananin Nov 06 '20 at 16:31
  • There is a recent AWS blog post that uses a Lambda to accomplish given task. It is also published in the AWS serverless app repository: https://aws.amazon.com/blogs/compute/using-amazon-sqs-dead-letter-queues-to-replay-messages/ (I have not tried it out, as I will go for the quick hack above, but this seems like the way to go) – t-h- Nov 30 '20 at 12:15
  • Works nicely, executed the NPM command from AWS CloudShell – Milan Gatyás Aug 10 '21 at 14:13
  • Activity is low on the github and it does not seems to replay fifo correctly (its modifies the deplucationId and groupId) :( – Stephane Jun 09 '22 at 12:18
16

Don't need to move the message because it will come with so many other challenges like duplicate messages, recovery scenarios, lost message, de-duplication check and etc.

Here is the solution which we implemented -

Usually, we use the DLQ for transient errors, not for permanent errors. So took below approach -

  1. Read the message from DLQ like a regular queue

    Benefits
    • To avoid duplicate message processing
    • Better control on DLQ- Like I put a check, to process only when the regular queue is completely processed.
    • Scale up the process based on the message on DLQ
  2. Then follow the same code which regular queue is following.

  3. More reliable in case of aborting the job or the process got terminated while processing (e.g. Instance killed or process terminated)

    Benefits
    • Code reusability
    • Error handling
    • Recovery and message replay
  4. Extend the message visibility so that no other thread process them.

    Benefit
    • Avoid processing same record by multiple threads.
  5. Delete the message only when either there is a permanent error or successful.

    Benefit
    • Keep processing until we are getting a transient error.
Ash
  • 1,210
  • 1
  • 10
  • 14
  • I really like your approach! How do you define "permanent error" in this case? – DMac the Destroyer Mar 14 '18 at 21:19
  • Anything greater than HTTP status code >200 < 500 is permanent error – Ash May 15 '18 at 06:08
  • 2
    this is indeed good approach in production. however I think this post is asking simply how to re-post messages from DLQ to normal queue. which sometimes comes handy if you know what you are doing. – linehrr Sep 10 '18 at 18:34
  • That's what I am saying that you shouldn't do it. Because if you do it then it will create more problems. We can move the message like any other message push but will lose the DLQ functionalities like receive count, visibility and all. It will be treated as a new message. – Ash Sep 27 '18 at 18:19
9

I wrote a small python script to do this, by using boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

you can get this script in this link

this script basically can move messages between any arbitrary queues. and it supports fifo queues as well as you can supply the message_group_id field.

Jagadeesh Govindaraj
  • 6,977
  • 6
  • 32
  • 52
linehrr
  • 1,668
  • 19
  • 24
  • 1
    For FIFO queue, you would need to provide a `MessageDeduplicationId` or have the queue enable `ContentBasedDeduplication` before sending a message. You can reuse the `MessageDeduplicationId` from the message in DLQ. – Sapience Feb 24 '21 at 21:35
8

That looks like your best option. There is a possibility that your process fails after step 2. In that case you'll end up copying the message twice, but you application should be handling re-delivery of messages (or not care) anyway.

Dave
  • 13,518
  • 7
  • 42
  • 51
6

here:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
Brian Dilley
  • 3,888
  • 2
  • 24
  • 24
4

DLQ comes into play only when the original consumer fails to consume message successfully after various attempts. We do not want to delete the message since we believe we can still do something with it (maybe attempt to process again or log it or collect some stats) and we do not want to keep encountering this message again and again and stop the ability to process other messages behind this one.

DLQ is nothing but just another queue. Which means we would need to write a consumer for DLQ that would ideally run less frequently (compared to original queue) that would consume from DLQ and produce message back into the original queue and delete it from DLQ - if thats the intended behavior and we think original consumer would be now ready to process it again. It should be OK if this cycle continues for a while since we now also get an opportunity to manually inspect and make necessary changes and deploy another version of original consumer without losing the message (within the message retention period of course - which is 4 days by default).

Would be nice if AWS provides this capability out of the box but I don't see it yet - they're leaving this to the end user to use it in way they feel appropriate.

rd2
  • 331
  • 2
  • 5
3

There is a another way to achieve this without writing single line of code. Consider your actual queue name is SQS_Queue and the DLQ for it is SQS_DLQ. Now follow these steps:

  1. Set SQS_Queue as the dlq of SQS_DLQ. Since SQS_DLQ is already a dlq of SQS_Queue. Now, both are acting as the dlq of the other.
  2. Set max receive count of your SQS_DLQ to 1.
  3. Now read messages from SQS_DLQ console. Since message receive count is 1, it will send all the message to its own dlq which is your actual SQS_Queue queue.
  • That will defeat the purpose of maintaining a DLQ. DLQ is intended for not over loading your system when you are observing failures so that you can do this later. – Buddha Apr 03 '18 at 09:47
  • 1
    It will definitely defeat the purpose and you will not be able to achieve other benefits like scaling up, throttling and receive count. Moreover, you should use the regular queue as processing queue and if the message receive count attains to 'N' then it should go to DLQ. This is what ideally, it should be configured. – Ash May 15 '18 at 06:22
  • 3
    As a 1-time solution to re-drive a lot of messages, this works like a charm. Not a good long-term solution, though. – nmio Aug 30 '18 at 17:41
  • 1
    Yes, this is extremely valuable as a one-time solution to redrive messages (after fixing the problem in the main queue). On AWS CLI the command I used is: `aws sqs receive-message --queue-url --max-number-of-messages 10`. Since the max messages you can read caps at 10, I suggest running the command in a loop like this: `for i in {1..1000}; do ; done` – Patrick Finnigan May 01 '19 at 21:49
3

We use the following script to redrive message from src queue to tgt queue:

filename: redrive.py

usage: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()
menrfa
  • 1,527
  • 12
  • 13
1

AWS Lambda solution worked well for us -

Detailed instructions: https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:303769779339:applications~aws-sqs-dlq-redriver

Github: https://github.com/honglu/aws-sqs-dlq-redriver.

Deployed with a click and another click to start the redrive!

visrahane
  • 327
  • 4
  • 5
1

Here is also the script (written in Typescript) to move the messages from one AWS queue to another one. Maybe it will be useful for someone.


import {
    SQSClient,
    ReceiveMessageCommand,
    DeleteMessageBatchCommand,
    SendMessageBatchCommand,
} from '@aws-sdk/client-sqs'

const AWS_REGION = 'eu-west-1'
const AWS_ACCOUNT = '12345678901'

const DLQ = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/dead-letter-queue`
const QUEUE = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/queue`

const loadMessagesFromDLQ = async () => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new ReceiveMessageCommand({
        QueueUrl: DLQ,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 60,
    })
    const response = await client.send(command)

    console.log('---------LOAD MESSAGES----------')
    console.log(`Loaded: ${response.Messages?.length}`)
    console.log(JSON.stringify(response, null, 4))
    return response
}

const sendMessagesToQueue = async (entries: Array<{Id: string, MessageBody: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new SendMessageBatchCommand({
        QueueUrl: QUEUE,
        Entries: entries.map(entry => ({...entry, DelaySeconds: 10})),
        // [
        // {
        //     Id: '',
        //     MessageBody: '',
        //     DelaySeconds: 10
        // }
        // ]
    })
    const response = await client.send(command)
    console.log('---------SEND MESSAGES----------')
    console.log(`Send: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const deleteMessagesFromQueue = async (entries: Array<{Id: string, ReceiptHandle: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new DeleteMessageBatchCommand({
        QueueUrl: DLQ,
        Entries: entries,
        // [
        //     {
        //         "Id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        //         "ReceiptHandle": "someReceiptHandle"
        //     }
        // ]
    })
    const response = await client.send(command)
    console.log('---------DELETE MESSAGES----------')
    console.log(`Delete: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const run = async () => {
    const dlqMessageList = await loadMessagesFromDLQ()

    if (!dlqMessageList || !dlqMessageList.Messages) {
        console.log('There is no messages in DLQ')
        return
    }

    const sendMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, MessageBody: msg.Body}))
    const deleteMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle}))

    await sendMessagesToQueue(sendMsgList)
    await deleteMessagesFromQueue(deleteMsgList)
}

run()


P.S. The script is with room for improvement, but anyway might be useful.

elbik
  • 1,749
  • 2
  • 16
  • 21
0

here is a simple python script you can use from the cli to do the same, depending only on boto3

usage

python redrive_messages __from_queue_name__ __to_queue_name__

code

import sys
import boto3

from src.utils.get_config.get_config import get_config
from src.utils.get_logger import get_logger

sqs = boto3.resource('sqs')

config = get_config()
log = get_logger()

def redrive_messages(from_queue_name:str, to_queue_name:str):
  # initialize the queues
  from_queue = sqs.get_queue_by_name(QueueName=from_queue_name)
  to_queue = sqs.get_queue_by_name(QueueName=to_queue_name)

  # begin querying for messages
  should_check_for_more = True
  messages_processed = []
  while (should_check_for_more):
    # grab the next message
    messages = from_queue.receive_messages(MaxNumberOfMessages=1);
    if (len(messages) == 0):
      should_check_for_more = False;
      break;
    message = messages[0]

    # requeue it
    to_queue.send_message(MessageBody=message.body, DelaySeconds=0)

    # let the queue know that the message was processed successfully
    messages_processed.append(message)
    message.delete()
  print(f'requeued {len(messages_processed)} messages')

if __name__ == '__main__':
  from_queue_name = sys.argv[1]
  to_queue_name = sys.argv[2]
  redrive_messages(from_queue_name, to_queue_name)
Ulad Kasach
  • 11,558
  • 11
  • 61
  • 87
0

SQS DLQ redrive official SDK/CLI support is launched (see here).

Sinval
  • 1,315
  • 1
  • 16
  • 25
Maali
  • 11
  • 1