29

I've looked over the documentation for Google's PubSub, and also tried looking in Google Cloud Monitoring, but couldn't find any means of figuring out what's the queue size in my topics.

Since I plan on using PubSub for analytics, it's important for me to monitor the queue count, so I could scale up/down the subscriber count.

What am I missing?

vdolez
  • 977
  • 1
  • 14
  • 33
Gil Adirim
  • 1,834
  • 2
  • 21
  • 33

6 Answers6

28

The metric you want to look at is "undelivered messages." You should be able to set up alerts or charts that monitor this metric in Google Cloud Monitoring under the "Pub/Sub Subscription" resource type. The number of messages that have not yet been acknowledged by subscribers, i.e., queue size, is a per-subscription metric as opposed to a per-topic metric. For info on the metric, see pubsub.googleapis.com/subscription/num_undelivered_messages in the GCP Metrics List (and others for all of the Pub/Sub metrics available).

Community
  • 1
  • 1
Kamal Aboul-Hosn
  • 15,111
  • 1
  • 34
  • 46
  • Because I had only signed up for the monitoring service yesterday, I was not yet able to see Pub/Sub under the "Resources" tab - now it's there. Thanks for the help – Gil Adirim Feb 19 '16 at 07:48
  • 5
    I tried to query an API directly and got NotFound error. https://pubsub.googleapis.com/v1//num_undelivered_messages This is a url. What am i missing? – vwvolodya Jun 30 '16 at 09:14
14

This might help if you're looking into a programmatic way to achieve this:

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = "my-project"
client = monitoring_v3.MetricServiceClient()
result = query.Query(
         client,
         project,
         'pubsub.googleapis.com/subscription/num_undelivered_messages', 
         minutes=60).as_dataframe()

print(result['pubsub_subscription'][project]['subscription_name'][0])
Steeve
  • 385
  • 2
  • 13
  • Awesome, this saved me tons of time. Thanks! – womp Apr 25 '20 at 18:31
  • @Steeve the above code snippet used to work for me perfectly. But when I deployed same code-base last week, it started breaking. I'm getting following error when trying to use as_dataframe() method – Piyush Shrivastava Oct 26 '20 at 20:44
  • Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/proto/message.py", line 555, in __getattr__ pb_type = self._meta.fields[key].pb_type KeyError: 'WhichOneof' – Piyush Shrivastava Oct 26 '20 at 20:50
  • During handling of the above exception, another exception occurred: Traceback (most recent call last): File "health-check.py", line 149, in check_topics_status result=result.select_interval(end_time=now,start_time=now - datetime.timedelta(minutes=5)).as_dataframe() File "/usr/local/lib/python3.7/dist-packages/google/cloud/monitoring_v3/query.py", line 533, in as_dataframe return _dataframe._build_dataframe(self, label, labels) – Piyush Shrivastava Oct 26 '20 at 20:52
  • File "/usr/local/lib/python3.7/dist-packages/google/cloud/monitoring_v3/_dataframe.py", line 96, in _build_dataframe data=[_extract_value(point.value) for point in time_series.points], File "/usr/local/lib/python3.7/dist-packages/google/cloud/monitoring_v3/_dataframe.py", line 96, in data=[_extract_value(point.value) for point in time_series.points], File "/usr/local/lib/python3.7/dist-packages/google/cloud/monitoring_v3/_dataframe.py", line 49, in _extract_value value_type = typed_value.WhichOneof("value") – Piyush Shrivastava Oct 26 '20 at 20:53
  • File "/usr/local/lib/python3.7/dist-packages/proto/message.py", line 560, in __getattr__ raise AttributeError(str(ex)) AttributeError: 'WhichOneof' – Piyush Shrivastava Oct 26 '20 at 20:54
  • @Steeve is it possible because of protobuf version? – Piyush Shrivastava Oct 26 '20 at 21:00
  • FYI. pip3 freeze shows me google-cloud-monitoring==2.0.0 and protobuf==3.13.0 – Piyush Shrivastava Oct 26 '20 at 21:10
  • Let me review this @Piyush Shrivastava and I'll get back to you – Steeve Oct 28 '20 at 13:42
  • Yes sure. @Steeve – Piyush Shrivastava Oct 29 '20 at 11:57
  • @Steeve are you able to reproduce this issue at your end? – Piyush Shrivastava Nov 03 '20 at 09:31
  • 1
    @PiyushShrivastava Updated answer to fix `WhichOneof` and removed `pandas` dependency. – northtree Dec 15 '20 at 05:57
10

The answer to your question is "no", there is no feature for PubSub that shows these counts. The way you have to do it is via log event monitoring using Stackdriver (it took me some time to find that out too).

The colloquial answer to this is do the following, step-by-step:

  1. Navigate from GCloud Admin Console to: Monitoring

navigate from gcloud admin console

  1. This opens a new window with separate Stackdriver console
  2. Navigate in Stackdriver: Dashboards > Create Dashboard

create new dashboard within stackdriver

  1. Click the Add Chart button top-right of dashboard screen

enter image description here

  1. In the input box, type num_undelivered_messages and then SAVE

auto suggested metrics to add chart

Mike S.
  • 4,806
  • 1
  • 33
  • 35
7

Updated version based on @steeve's answer. (without pandas dependency)

Please note that you have to specify end_time instead of using default utcnow().

import datetime
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = 'my-project'
sub_name = 'my-sub'
client = monitoring_v3.MetricServiceClient()
result = query.Query(
  client,
  project,
  'pubsub.googleapis.com/subscription/num_undelivered_messages',
  end_time=datetime.datetime.now(),
  minutes=1,
  ).select_resources(subscription_id=sub_name)

for content in result:
  print(content.points[0].value.int64_value)
northtree
  • 8,569
  • 11
  • 61
  • 80
  • 2
    Nce work and very well done. I didn't have time to review my proposed solution. Yours work and will benefit the community – Steeve Dec 16 '20 at 13:55
  • @northtree thanks for this alternative approach. Really helps. I was still wondering why the earlier one doesn't work, since monitoring api version is same – Piyush Shrivastava Dec 24 '20 at 11:43
  • @PiyushShrivastava I suppose it’s due to the upgrade of SDK. – northtree Dec 24 '20 at 11:47
3

Here is a java version

package com.example.monitoring;

import static com.google.cloud.monitoring.v3.MetricServiceClient.create;
import static com.google.monitoring.v3.ListTimeSeriesRequest.newBuilder;
import static com.google.monitoring.v3.ProjectName.of;
import static com.google.protobuf.util.Timestamps.fromMillis;
import static java.lang.System.currentTimeMillis;

import com.google.monitoring.v3.ListTimeSeriesRequest;
import com.google.monitoring.v3.TimeInterval;

public class ReadMessagesFromGcp {

  public static void main(String... args) throws Exception {
   
    String projectId = "put here";

    var interval = TimeInterval.newBuilder()
                               .setStartTime(fromMillis(currentTimeMillis() - (120 * 1000)))
                               .setEndTime(fromMillis(currentTimeMillis()))
                               .build();

    var request = newBuilder().setName(of(projectId).toString())
           .setFilter("metric.type=\"pubsub.googleapis.com/subscription/num_undelivered_messages\"")
           .setInterval(interval)
           .setView(ListTimeSeriesRequest.TimeSeriesView.FULL)
           .build();

    var response = create().listTimeSeries(request);

    for (var subscriptionData : response.iterateAll()) {
        
        var subscription = subscriptionData.getResource().getLabelsMap().get("subscription_id");
        
        var numberOrMessages = subscriptionData.getPointsList().get(0).getValue().getInt64Value();
            
        if(numberOrMessages > 0) {
            System.out.println(subscription + " has " + numberOrMessages + " messages ");
        }
            
    }
  }
}
<dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-monitoring</artifactId>
            <version>3.3.2</version>
        </dependency>
    
        <dependency>
          <groupId>com.google.protobuf</groupId>
          <artifactId>protobuf-java-util</artifactId>
          <version>4.0.0-rc-2</version>
        </dependency>

output

queue-1 has 36 messages

queue-2 has 4 messages

queue-3 has 3 messages

Bruno Lee
  • 1,867
  • 16
  • 17
0

There is a way to count all messages published to a topic using custom metrics.

In my case I am publishing messages to a Pub/Sub topic via a Cloud Composer (Airflow) Dag that runs a python script.

The python script returns logging information about the ran Dag.

logging.info(
f"Total events in file {counter-1}, total successfully published {counter - error_counter -1}, total errors publishing {error_counter}. Events sent to topic: {TOPIC_PATH} from filename: {source_blob_name}.",
{
"metric": "<some_name>",
"type": "completed_file",
"topic": EVENT_TOPIC,
"filename": source_blob_name,
"total_events_in_file": counter - 1,
"failed_published_messages": error_counter,
"successful_published_messages": counter - error_counter - 1,
}

I then have a Distribution custom metric which filters on resource_type, resource_lable, jsonPayload.metric and jsonPayload.type. The metric also has the Field Name set to jsonPayload.successful_published_messages

Custom metric filter:

resource.type=cloud_composer_environment AND resource.labels.environment_name={env_name} AND jsonPayload.metric=<some_name> AND jsonPayload.type=completed_file

That custom metric is then used in a Dashboard with the MQL setting of

fetch cloud_composer_environment
| metric
'logging.googleapis.com/user/my_custom_metric'
| group_by 1d, [value_pubsub_aggregate: aggregate(value.pubsub)]
| every 1d
| group_by [],
[value_pubsub_aggregate_sum: sum(value_pubsub_aggregate)]

Which to get to I first setup an Icon chart with resource type: cloud composer environment, Metric: my_custom metric, Processing step: to no preprocessing step, Alignment function: SUM, period 1, unit day, How do you want it grouped group by function: mean.

Ideally you would just select sum for the Group by function but it errors and that is why you then need to sqitch to MQL and manually enter sum instead of mean.

enter image description here

This will now count your published messages for up to 24 months which is the retention period set by Google for the custom metrics.

Hutch
  • 411
  • 10
  • 32