2

Please could someone help me with the Google Pubsub Python Client Library? I am following the tutorial at https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-async-python closely and seem to get unprompted errors. I have a simple script called "sendmessage.py" that sends a text message with a random number appended so that I can tell messages apart. The subscriber code runs on a separate compute engine instance and looks like this:

from google.cloud import pubsub_v1

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

def listen_for_errors():

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path('<my-project-name-here>', 'test-subscription')

    subscription = subscriber.subscribe(subscription_path, callback=callback)

    try:
        subscription.future.result()
    except Exception as e:
        print(
            'Listening for messages on {} threw an Exception: {}.'.format( 'test-subscription', e))
        raise

A screenshot of the send/receive running on two compute instances is attached. Pubsub error The system seems to work fine for the first minute or so, then the subscriber seems to trip up with the following error message:

Exception in thread Thread-ConsumeBidirectionalStream:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/pubsub_v1/subscribe
r/_consumer.py", line 363, in _blocking_consume
    request_generator, response_generator)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/pubsub_v1/subscribe
r/_consumer.py", line 275, in _stop_request_generator
    if not response_generator.done():
AttributeError: '_StreamingResponseIterator' object has no attribute 'done'

This happens after a short time (less than a few minutes) even if no messages are sent. Once it has crashed, there is no way to recover - e.g. by pressing enter, typing quit(), pressing CTRL+C, etc, so I have to shutdown the instance and start over.

I find it a bit strange that I am following the tutorials so closely and yet there are unprompted errors when my code is running. Please would it be possible for someone to point out where I have gone wrong or suggest a robust workaround to ignore the error and keep listening for messages?

Kind regards and thank you to anyone who can help,

Paul

Developer Guy
  • 2,318
  • 6
  • 19
  • 37
Paul
  • 528
  • 5
  • 17

1 Answers1

4

Im starting to make a habit of responding to my own questions (which possibly means that I should have researched more or tried a little harder before posting here!), but I believe I have resolved the issue above. I thought I would post the answer here in case others are curious or run into the same problem, but if a moderator would prefer to delete the topic then please do so.

Shortly after posting my problem, I tried running the same scripts on my own computer and discovered that they were highly stable, which led me to conclude that the problem was with the way that the compute engine instances were set up.

My original boot code included the lines:

sudo apt-get update
sudo apt-get -yq install python-pip
sudo apt-get install python-dev
sudo pip install --upgrade google-cloud-storage
sudo pip install --upgrade google-cloud-pubsub

I wonder whether the last line should have referenced pubsub_v1, but in any case I changed the bootscript code to this:

sudo apt-get update
sudo apt-get -yq install python-pip
sudo pip install --upgrade google-cloud

And it seems to have resolved the problem.

Paul
  • 528
  • 5
  • 17