26

I'm working on an application whose workflow is managed by passing messages in SQS, using boto.

My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.

Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]

Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.

After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.

Is there a simple way to get all messages in the queue, without knowing how many there are?

Charles Menguy
  • 40,830
  • 17
  • 95
  • 117

6 Answers6

29

I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.

Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)
Niklas Rosencrantz
  • 25,640
  • 75
  • 229
  • 424
Timothy Liu
  • 291
  • 3
  • 3
  • An adaptation for the v2 `Boto` packages to "backport" the `delete_messages` function from `Boto3` is [here](http://stackoverflow.com/a/40638174/4228193). The built-in `Boto`(2) `delete_message_batch` has a limitation of 10 messages AND requires full `Message`-class objects, rather than just the `ID` and `ReceiptHandles` in an object. – mpag Nov 18 '16 at 17:55
  • Thank you for this answer, it was helpful to solve this question https://stackoverflow.com/questions/62681836/re-process-dlq-events-in-lambda/63206442 – Dos Aug 01 '20 at 14:43
  • I don't think OP needs to delete the messages immediately from the queue. – Memphis Meng May 01 '22 at 23:44
26

Put your call to q.get_messages(n) inside while loop:

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)

Additionally, dump won't support more than 10 messages either:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""
AJ.
  • 27,586
  • 18
  • 84
  • 94
  • I can't really do that, since the messages in SQS have a visibility timeout, so if I first get 10 messages, then loop a few times, next time I might be getting the same 10 messages since the timeout has passed. I'm thinking about using `dump()` but i'll have to read the file after, that seems silly, am I missing something? (I could set the visibility_timeout to a very long time, but that seems ugly). – Charles Menguy Apr 16 '12 at 20:09
  • @linker - you said you need to check for 'n' specific messages. does this mean that there is some match criteria to which you are comparing each message? – AJ. Apr 16 '12 at 20:12
  • @linker - According to the reference, the visibility timeout can be up to 12 hours. Unless you're kicking off a massive EC2 job, I'm guessing this would suit your needs? http://docs.amazonwebservices.com/AWSSimpleQueueService/2011-10-01/APIReference/Query_QueryReceiveMessage.html – AJ. Apr 16 '12 at 20:20
  • 2
    @linker - btw, number of messages is only supposed to be 1 to 10. If you use something else, the SQS service should return a `ReadCountOutOfRange` error. – AJ. Apr 16 '12 at 20:21
  • This is actually a bit tricky to rely on time in this context, as I'm getting events which for some I have no control over (come from external source, if they have an issue for example and stop sending data for several hours that could pose a problem). I don't understand why there's a dump method, but no get_all_messages, makes no sense to me. – Charles Menguy Apr 16 '12 at 20:21
  • Nice, didn't know about the ReadCountOutOfRange, will I also get this error when doing a dump() if there are more elements? – Charles Menguy Apr 16 '12 at 20:22
  • @linker - have a look at the boto source for the dump method. See my updated answer. – AJ. Apr 16 '12 at 20:26
  • I just tested putting 12 messages in my SQS queue with boto and trying dump() as well, this seems to work fine though, I'm using boto 2.1.1 – Charles Menguy Apr 16 '12 at 20:30
  • Even 2.1.1 has these comments, that's weird, I suspect it's some legacy comment, because otherwise it works fine even with a much larger number of messages. – Charles Menguy Apr 16 '12 at 20:37
  • @linker - interesting, as the public API documentation would suggest this should return an error. http://docs.amazonwebservices.com/AWSSimpleQueueService/2011-10-01/APIReference/Query_QueryReceiveMessage.html I would advise that this is undocumented behavior that may go away (they may start enforcing the limit at any time) so you shouldn't depend on it to work long-term. – AJ. Apr 16 '12 at 20:38
  • Interesting indeed. I'll just settle with this code, this seems pretty similar to what dump() does, thanks ! – Charles Menguy Apr 16 '12 at 20:41
8

My understanding is that the distributed nature of the SQS service pretty much makes your design unworkable. Every time you call get_messages you are talking to a different set of servers, which will have some but not all of your messages. Thus it is not possible to 'check in from time to time' to set if a particular group of messages are ready, and then just accept those.

What you need to do is poll continuously, take all the messages as they arrive, and store them locally in your own data structures. After each successful fetch you can check your data structures to see if a complete set of message has been collected.

Keep in mind that messages will arrive out of order, and some messages will be delivered twice, as deletes have to propagate to all the SQS servers, but subsequent get requests sometimes beat out the delete messages.

Jeff
  • 6,646
  • 5
  • 27
  • 33
4

I execute this in a cronjob

from django.core.mail import EmailMessage
from django.conf import settings
import boto3
import json

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
         aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
         region_name=settings.AWS_REGION)

queue = sqs.get_queue_by_name(QueueName='email')
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)

while len(messages) > 0:
    for message in messages:
        mail_body = json.loads(message.body)
        print("E-mail sent to: %s" % mail_body['to'])
        email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']])
        email.send()
        message.delete()

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
Karl Zillner
  • 581
  • 7
  • 16
0

Something like the code below should do the trick. Sorry it's in C#, but it shouldn't be hard to convert to python. The dictionary is used to weed out the duplicates.

    public Dictionary<string, Message> GetAllMessages(int pollSeconds)
    {
        var msgs = new Dictionary<string, Message>();
        var end = DateTime.Now.AddSeconds(pollSeconds);

        while (DateTime.Now <= end)
        {
            var request = new ReceiveMessageRequest(Url);
            request.MaxNumberOfMessages = 10;

            var response = GetClient().ReceiveMessage(request);

            foreach (var msg in response.Messages)
            {
                if (!msgs.ContainsKey(msg.MessageId))
                {
                    msgs.Add(msg.MessageId, msg);
                }
            }
        }

        return msgs;
    }
Timothy Gonzalez
  • 1,802
  • 21
  • 18
0

NOTE: This is not intended as a direct answer to the question. Rather it is an augmentation to @TimothyLiu's answer, assuming the end-user is using the Boto package (aka Boto2) not Boto3. This code is a "Boto-2-ization" of the delete_messages call referred to in his answer


A Boto(2) call for delete_message_batch(messages_to_delete) where messages_to_delete is a dict object with key:value corresponding to id:receipt_handle pairs returns

AttributeError: 'dict' object has no attribute 'id'.

It seems delete_message_batch expects a Message class object; copying the Boto source for delete_message_batch and allowing it to use a non-Message object (ala boto3) also fails if you're deleting more than 10 "messages" at a time. So, I had to use the following work-around.

eprint code from here

from __future__ import print_function
import sys
from itertools import islice

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

@static_vars(counter=0)
def take(n, iterable, reset=False):
    "Return next n items of the iterable as same type"
    if reset: take.counter = 0
    take.counter += n
    bob = islice(iterable, take.counter-n, take.counter)
    if isinstance(iterable, dict): return dict(bob)
    elif isinstance(iterable, list): return list(bob)
    elif isinstance(iterable, tuple): return tuple(bob)
    elif isinstance(iterable, set): return set(bob)
    elif isinstance(iterable, file): return file(bob)
    else: return bob

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
  """
  Deletes a list of messages from a queue in a single request.
  :param cx: A boto connection object.
  :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
  :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
  """
  listof10s = []
  asSuc, asErr, acS, acE = "","",0,0
  res = []
  it = tuple(enumerate(messages))
  params = {}
  tenmsg = take(10,it,True)
  while len(tenmsg)>0:
    listof10s.append(tenmsg)
    tenmsg = take(10,it)
  while len(listof10s)>0:
    tenmsg = listof10s.pop()
    params.clear()
    for i, msg in tenmsg: #enumerate(tenmsg):
      prefix = 'DeleteMessageBatchRequestEntry'
      numb = (i%10)+1
      p_name = '%s.%i.Id' % (prefix, numb)
      params[p_name] = msg.get('id')
      p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
      params[p_name] = msg.get('receipt_handle')
    try:
      go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
      (sSuc,cS),(sErr,cE) = tup_result_messages(go)
      if cS:
        asSuc += ","+sSuc
        acS += cS
      if cE:
        asErr += ","+sErr
        acE += cE
    except cx.ResponseError:
      eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
    except:
      eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
  return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
  if sSuc == "": sSuc="None"
  if sErr == "": sErr="None"
  if cS == expect: sSuc="All"
  if cE == expect: sErr="All"
  return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
Community
  • 1
  • 1
mpag
  • 531
  • 7
  • 19