While playing with GCP Pub/Sub I need to keep an eye on my topics and retrieve the number of undelivered messages. It's working pretty well with this snippet of Google Query Monitoring : Link.
But I need to group my messages by attributes. Each message gets a body with params like : {'target':'A'}
and I really need to get somethig like that :
msg.target | undelivered messages |
---|---|
A | 34 |
B | 42 |
C | 42 |
I don't succed to access it without consuming messages.
This is my first try :
import json
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
project_id = "xxxx"
subscription_id = "xxxx"
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path,"max_messages":9999}
)
ack_ids=[r.ack_id for r in response.received_messages]
subscriber.modify_ack_deadline(
request={
"subscription": subscription_path,
"ack_ids": ack_ids,
"ack_deadline_seconds": 0, ## The message will be immediatly 'pullable' again ?
}
)
messages = [ json.loads(r.message.data.decode()) for r in response.received_messages ]
for m in messages :
## Parse all messages to get my needed counts
But it's not working very well. I get a random number of messages each time so it's impossible to be sure of what I'm looking.
So here I am in my experimentations.
I see 3 ways :
- Maybe it's possible to access messages body attributes directly from Google Query Monitoring ?
- Maybe my method to consume / parse / release all messages is not correctly write and that's why it's not working well ?
- Maybe I'm all wrong and it will be more efficient to create many topics instead of keep attributes in messages body OR there is another way to "tag" message to group them after in Monitoring ?
Do you figure how to do this ? Thanks a lot in advance for your help !