4

Exceptions:

Exception in thread Thread-ConsumeBidirectionalStream: grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

The above exception was the direct cause of the following exception:

google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]

I'm trying to build an IoT prototype that roughly follows Google's end-to-end sample (docs | code) and I am encountering an error in the subscriber when there are no messages in the queue. This can happen both when the subscriber first starts against an empty queue after about a minute and also after processing any number of messages and a minute or so after the queue is emptied.

I have found a workaround here on StackOverflow but can't get it working. So my question is how to get this workaround policy working since all it seems to do is hide the error - my subscriber still hangs and doesn't process any further messages.

The relevant bits of code look like this:

from google.cloud import pubsub
import google.cloud.pubsub_v1.subscriber.message as Message

from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc

class WorkaroundPolicy(thread.Policy):
    def on_exception(self, exception):
        # If we are seeing UNAVALABLE then we need to retry (so return None)
        unavailable = grpc.StatusCode.UNAVAILABLE

        if isinstance(exception, ServiceUnavailable):
            logger.warning('Ignoring google.api_core.exceptions.ServiceUnavailable exception: {}'.format(exception))
            return
        elif getattr(exception, 'code', lambda: None)() in [unavailable]:
            logger.warning('Ignoring grpc.StatusCode.UNAVAILABLE (Orginal exception: {})'.format(exception))
            return

        # For anything else fall back on the parent class behaviour
        super(WorkaroundPolicy, self).on_exception(exception)


# Other imports and code ommitted for brevity

def callback(msg: Message):
    try:
        data = json.loads(msg.data)
    except ValueError as e:
        logger.error('Loading Payload ({}) threw an Exception: {}.'.format(msg.data, e))
        # For the prototype, if we can't read it, then discard it
        msg.ack()
        return

    device_project_id = msg.attributes['projectId']
    device_registry_id = msg.attributes['deviceRegistryId']
    device_id = msg.attributes['deviceId']
    device_region = msg.attributes['deviceRegistryLocation']

    self._update_device_config(
      device_project_id,
      device_region,
      device_registry_id,
      device_id,
      data)

    msg.ack()

def run(self, project_name, subscription_name):   
    # Specify a workaround policy to handle StatusCode.UNAVAILABLE [code=8a75] error (but may get CPU issues)
    #subscriber = pubsub.SubscriberClient(policy_class = WorkaroundPolicy)

    # Alternatively, instantiate subscriber without the workaround to see full exception stack
    subscriber = pubsub.SubscriberClient()

    subscription = subscriber.subscribe(subscription_path, callback)

    subscription.future.result()

    while True:
        time.sleep(60)

If it helps, the full version of this can be found in GitHub.

Stack trace/command line output (without policy workaround)

(venv) Freds-MBP:iot-subscriber-issue Fred$ python Controller.py \
     --project_id=xyz-tests \
     --pubsub_subscription=simple-mqtt-controller \
     --service_account_json=/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json

2018-03-21 09:36:20,975 INFO Controller Creating credentials from JSON Key File: "/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json"...
2018-03-21 09:36:20,991 INFO Controller Creating service from discovery URL: "https://cloudiot.googleapis.com/$discovery/rest?version=v1"...
2018-03-21 09:36:20,992 INFO googleapiclient.discovery URL being requested: GET https://cloudiot.googleapis.com/$discovery/rest?version=v1
2018-03-21 09:36:21,508 INFO Controller Creating subscriber for project: "xyz-tests" and subscription: "simple-mqtt-controller"...
2018-03-21 09:36:23,200 INFO Controller Listening for messages on projects/xyz-tests/subscriptions/simple-mqtt-controller...

# This then occurs typically 60 seconds or so (sometimes up to 2 mins) later:

Exception in thread Thread-ConsumeBidirectionalStream:
Traceback (most recent call last):
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 76, in next
    return six.next(self._wrapped)
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 347, in __next__
    return self._next()
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 341, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 349, in _blocking_consume
    for response in responses:
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 476, in _pausable_iterator
    yield next(iterator)
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 78, in next
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 363, in _blocking_consume
    request_generator, response_generator)
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 275, in _stop_request_generator
    if not response_generator.done():
AttributeError: '_StreamingResponseIterator' object has no attribute 'done'

^C

Traceback (most recent call last):
  File "Controller.py", line 279, in <module>
    if __name__ == '__main__':
  File "Controller.py", line 274, in main
    try:
  File "Controller.py", line 196, in run
    
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 111, in result
    err = self.exception(timeout=timeout)
  File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 133, in exception
    if not self._completed.wait(timeout=timeout):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt
(venv) Freds-MBP:iot-subscriber-issue Fred$

This seems to be an ongoing problem looking at the following issues in GitHub (all of which are now closed):

Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234

Pub/Sub has no way to track errors from the subscriber thread. #3888

PubSub: set_exception can only be called once. can still occur "erroneously" #4463

I've also found the followng items on StackOverflow:

Google PubSub python client returning StatusCode.UNAVAILABLE was posted in October 2017 and the answer is the policy class workaround that I have tried in my code above. Although, in my code at least, the answer proposed only hides the error but doesn't allow new messages to be processed.

Google Pubsub Python Client library subscriber crashes randomly seems to be the same cause but the use case is different. The answer (provided by the questioner) suggests that upgrading to the latest google-cloud solves the problem but I am already using the latest versions of google-api-core (1.1.0) and google-cloud-pubsub (0.32.1) etc.

Google Pub/Sub Subscriber not receiving messages after a while may be related but there is no conclusive answer.

Other Info: OS : Mac OS X El Capitan 10.11.6 Python 3.6.2 running in virtualenv 15.1.0

(partial) pip freeze output:

google-api-core==1.1.0 
google-api-python-client==1.6.5 
google-auth==1.4.1 
google-auth-httplib2==0.0.3 
google-cloud-pubsub==0.32.1 
googleapis-common-protos==1.5.3 
grpc-google-iam-v1==0.11.4 
grpcio==1.10.0 
httplib2==0.10.3 
paho-mqtt==1.3.1 
Community
  • 1
  • 1
datacentricity
  • 1,099
  • 9
  • 15
  • 1
    Hi @datacentricity thank you for taking the time to write such a comprehensive question. I really hope you get your issue resolved as your question deserves a good answer. I wrote one of the questions and "answers" that you reference in your thread. Contrary to what my thread says, I still have the same problem - pubsub crashes after about a minute even with no messages in queue! – Paul Apr 05 '18 at 18:02

1 Answers1

1

I'm seeing similar behavior as you. Except that regardless of the workaround policy, the process always hangs without any exception thrown.

Even stranger is that it's working on a colleagues machine and not mine. I have tried on two other machines and docker container and both hang.

Next, I'm going try strip everything down to a minimal containerized example I can take to Google and hopefully get some feedback from them.

Let me know if you learn anything in the interim.

hobochild
  • 73
  • 5
  • Will do - seems bizarre that it is so intermittent – datacentricity Mar 25 '18 at 09:58
  • Hey @datacentricity, I figured out my issue somewhat. I was running the pubsub client with a flask server and the flask servers reload feature would case the process to hang. The workaround for me was to only initialize the subscription and the async threads when the process runs and [not on every reload](https://stackoverflow.com/questions/9449101/how-to-stop-flask-from-initialising-twice-in-debug-mode). – hobochild Mar 25 '18 at 20:07
  • As this is a very early prototype, I just have this running in a virtualenv on my mac - containerisation is next on the list so let's see how it goes – datacentricity Mar 26 '18 at 13:59