0

While playing with GCP Pub/Sub I need to keep an eye on my topics and retrieve the number of undelivered messages. It's working pretty well with this snippet of Google Query Monitoring : Link.

But I need to group my messages by attributes. Each message gets a body with params like : {'target':'A'} and I really need to get somethig like that :

msg.target undelivered messages
A 34
B 42
C 42

I don't succed to access it without consuming messages.

This is my first try :

import json
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

project_id = "xxxx"
subscription_id = "xxxx"

subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
    request={"subscription": subscription_path,"max_messages":9999}
)

ack_ids=[r.ack_id for r in response.received_messages]

subscriber.modify_ack_deadline(
    request={
        "subscription": subscription_path,
        "ack_ids": ack_ids,
        "ack_deadline_seconds": 0, ## The message will be immediatly 'pullable' again ?
    }
)

messages = [ json.loads(r.message.data.decode()) for r in response.received_messages ]
for m in messages :
    ## Parse all messages to get my needed counts

But it's not working very well. I get a random number of messages each time so it's impossible to be sure of what I'm looking.

So here I am in my experimentations.

I see 3 ways :

  1. Maybe it's possible to access messages body attributes directly from Google Query Monitoring ?
  2. Maybe my method to consume / parse / release all messages is not correctly write and that's why it's not working well ?
  3. Maybe I'm all wrong and it will be more efficient to create many topics instead of keep attributes in messages body OR there is another way to "tag" message to group them after in Monitoring ?

Do you figure how to do this ? Thanks a lot in advance for your help !

Tomy137
  • 91
  • 9

1 Answers1

2

The first thing to note is that the number of undelivered messages is a property of a subscription, not a topic. If there are multiple subscriptions to the same topic, then the number of undelivered messages could be different. There is no way in the Google Query Monitoring system to break down messages by attributes; it does not have any introspection into the contents of the backlog of messages, only to the metadata that is the number of messages.

The code as you have it has several things that make it problematic for trying to determine the number of messages remaining:

  1. Synchronous pull can only return up to 1000 messages, so setting max_messages to 9999 messages will never give you that many messages.
  2. Even with max_messages set to 1000, there is no guarantee that 1000 messages will be returned, even if there are 1000 messages that have not yet been delivered. You would need to issue multiple pull requests in order to fetch all of the messages. Of course, since you nack the messages (by doing a modify_ack_deadline with 0), messages could be redelivered and therefore double counted.
  3. Even though you do the modify_ack_deadline request to nack the messages, while the messages are outstanding to this monitor, they are not available for delivery to your actual subscriber, which delays processing. Furthermore, consider the situation where your monitor crashes for some reason before it gets to perform the modify_ack_deadline. In this situation, those messages would not be delivered to your actual subscriber until the ack deadline you configured in the subscription had passed. If your application is latency-sensitive in any way, this could be a problem.

A different approach to consider would be to create a second subscription and have a monitoring application that receives all messages. For each message, it looks at the attribute and counts it as a received message for that attribute, then acknowledges the message. You could report this count per attributed breakdown via a custom metric. In your actual subscriber application, you would also create a custom metric that counts the number of messages received and processed per attribute. To compute the number of messages remaining to process per attribute, you would take the difference of these two numbers.

Alternatively, you could consider separating messages per attribute into different topics. However, there are a few things to consider:

  1. Is the set of attributes fixed and known in advance? If not, how will your subscriber know which subscriptions to subscribe to?
  2. How big is the set of attributes to be checked? There is a limit of 10,000 topics per project and so if you have more attributes than that, this approach will not work.
  3. Are you using flow control to limit how many messages are being processed by your subscriber simultaneously? If so, is the number of messages per attribute uniform? If not, you may have to consider how to divide up the flow control across the subscribers on the different subscriptions.
Kamal Aboul-Hosn
  • 15,111
  • 1
  • 34
  • 46
  • Thanks a lot for this answer containing so much details. I’ll keep in mind your warning about the pull limit and try your idea of second subscription. Seems to be an very clever idea ! I’ll update this topic right after testing ! – Tomy137 Jan 04 '22 at 22:46