3

I have a simple python script that uses Google pubsub to detect new files in the google cloud storage. The script simply adds new messages to a queue where another thread processes those messages:

subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

subscriber.subscribe(subscription_path, callback=callback_fun)

while True:
    if not message_queue:
        time.sleep(60)
        continue
    else:
        process_next_message(message_queue.pop())

Here, callback_fun simply adds the message to the queue:

def callback_fun(message):
    message.ack()
    message_queue.append(message)

The problem I am having is that after a while (maybe a couple of days), the subscriber stops receiving new file notifications. If I stop and restart the script, it gets all of the notifications at once.

I was wondering if anyone else is having similar issues and/or can suggest ways to troubleshoot (maybe by printing debugging messages that are normally unseen). I am now trying to stop/restart the subscriber, but I am sure that this is not the best idea for using in a production environment.

I am using google-cloud 0.32.0 and google-cloud-pubsub 0.30.1.

ThePyGuy
  • 17,779
  • 5
  • 18
  • 45
mrtksy
  • 103
  • 1
  • 9
  • I see you are using a pull subscriber to retrieve the messages. Have you considered adding [flow control](https://cloud.google.com/pubsub/docs/pull#message-flow-control) functionality to control the rate the messages are being received by the subscriber? – Rodrigo C. Feb 27 '18 at 11:18
  • Thanks. I understand that flow control is mainly used for limiting the number of messages that a subscriber receives (e.g., to avoid congestion). My application has a very low message rate (10-20 messages an hour or so), so I'm not worried about congestion. In fact, what I'm trying to do sounds like the opposite: to make sure that I get all the messages. Your suggestion in your other answer is quite interesting and something that I definitely would like to try out. – mrtksy Mar 02 '18 at 20:55

4 Answers4

4

In general, there can be several reasons why a subscriber may stop receiving messages:

  1. If a subscriber does not ack or nack messages, the flow control limits can be reached, meaning no more messages can be delivered. This does not seem to be the case in your particular instance given that you immediately ack messages. As an aside, I would recommend against acking messages before your queue has processed them unless you are okay with the possibility of messages not being processed. Otherwise, if your app crashes after the ack, but before the message queue processes them, you will have not processed the message and will not get it redelivered since it was acked.
  2. If another subscriber starts up for the same subscription, it could be receiving the messages. In this scenario, one would expect the subscriber to receive a subset of the messages rather than no messages at all.
  3. Publishers just stop publishing messages and therefore there are no messages to receive. If you restart the subscriber and it starts receiving messages again, this probably isn't the case. You can also verify that a backlog is being built up by looking at the Stackdriver metric for subscription/backlog_bytes.

If your problem does not fall into one of those categories, it would be best to reach out to Google Cloud support with your project name, topic name, and subscription name so that they can narrow down the issue to either your user code, the client library, or the service.

Kamal Aboul-Hosn
  • 15,111
  • 1
  • 34
  • 46
  • I am interested in point #1. Is there a reference to this "flow control limit"? Is it configurable or can it be monitored? – m.spyratos Mar 01 '23 at 16:14
0

Apart from the flow control suggestion I offered in my previous comment, you could also define a Cloud Function that gets triggered any time a new message is published in a Pub/Sub topic. These Cloud Functions act as subscriptions and will get notified every time a certain event (such as a message being published) occurs.

This tutorial will help you to develop a background Cloud Function that will get triggered when a message is published in a Pub/Sub topic.

Rodrigo C.
  • 1,114
  • 7
  • 13
  • At first glance, I understand that cloud functions seem to be intended to be used with JavaScript. My application uses python and a lot of processing power in a Google Cloud Engine. What do you think would be the best way to use Cloud Functions with python and GCE ? – mrtksy Mar 02 '18 at 20:58
  • Right now it is true that Cloud Functions can only be built using Node.js. In the future more languages will be available for creating them, including Python. However, when using Cloud Functions as triggers for Pub/Sub, they work in the background and can be integrated with the Pub/Sub operations. That way these functions are integrated with the rest of your project. These functions will be called indirectly any time you publish a message in a Pub/Sub topic, so that any time they get triggered you will be notified that a new message has arrived. – Rodrigo C. Mar 07 '18 at 08:39
  • As of today, You can use Python to write cloud functions. – Arvind Oct 30 '18 at 09:14
0

I got stuck 1 hour on this problem, so this is how I fixed my problem :

The GOOGLE_APPLICATION_CREDENTIALS environment variable was setup to a different service account, who wasn't on the right project

project_id = "my_project_sandbox" 

And

my_project.json (service account used by project)

{
  "type": "service_account",
  "project_id": "my_project_prod",
  "private_key_id": "---",
  "private_key": "---",
  ...
}
Fabinout
  • 878
  • 7
  • 25
0

This is not a python specific problem. I had this problem also with Node.js. The problem is that essentially when the subscription errors it falls over and never receives a message again. BTW it should not do this and this is a bug.

It took me 3 days to find the fix but its simple! Its documented on github. Essentially, you have 2 or 3 options.

(1) Use grpc (whatever that is)

Here's the node code (simple enough to translate to your coding language):

const {PubSub} = require('@google-cloud/pubsub');
const grpc = require('grpc');
const pubsub = new PubSub({grpc});

This is the recommended approach with the downside that the grpc package is now deprecated (by grpc themselves not by Google). There is a package called @grpc/grpc-js that has replaced grpc but I've no idea how to use it in conjunction with @google-cloud/pubsub. Using grpc is the solution I used and I can vouch it works! My subscription keeps receiving messages now even after it errors!

OR alternatively

(2) Re-init PubSub subscription on an error

If you are facing this problem, then just re-initialise the subscription on an error:

const initSubscriber = () => {
  const pubsub = new PubSub();
  const subscription = pubsub.subscription(topic, options);
  subscription.on('message', handler.handleMessage);
  subscription.on('error', e => {
    initSubscriber();
  });
};

initSubscriber();

Whilst this approach is reported to work, when the aforementioned bug no longer exists, then this approach may cause problems / have side effects. I cannot vouch for this approach as I've never tried it. If you are desperate give it a go.

(3) Increase acknowledgement deadline

This is not so much a fix as a potential workaround for some scenarios. In my experience, the subscription errors when the message is ack'd after the acknowledgement deadline. By increasing the acknowledgement deadline, you reduce the chance of this happening and thus the subscription won't error and thus the subscription won't fall over and thus you won't have a problem to fix! Obviously, if the subscription did ever fall over then you would be in trouble and solutions (1) and (2) still apply.

(4) General tips

The advice given in @KamalAboul-Hosn answer is useful and may apply to your case. It did not help for me but may help for some.

(5) Bonus tip

In Google Cloud Platform > Pub/Sub > Subscriptions

You can see how many messages have not yet been acknowledged. If acknowledgements are not happening (the unacked message count shown in the graph does not decrease) AFTER your subscription errors THEN you know this solution is the right one for you!

enter image description here

I took the time to write this up to save you 3 days coz J-E^S^-U-S died and rose 3 days later to save your life. He loves you and wanted you to know that :D

danday74
  • 52,471
  • 49
  • 232
  • 283