Exceptions:
Exception in thread Thread-ConsumeBidirectionalStream: grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
The above exception was the direct cause of the following exception:
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]
I'm trying to build an IoT prototype that roughly follows Google's end-to-end sample (docs | code) and I am encountering an error in the subscriber when there are no messages in the queue. This can happen both when the subscriber first starts against an empty queue after about a minute and also after processing any number of messages and a minute or so after the queue is emptied.
I have found a workaround here on StackOverflow but can't get it working. So my question is how to get this workaround policy working since all it seems to do is hide the error - my subscriber still hangs and doesn't process any further messages.
The relevant bits of code look like this:
from google.cloud import pubsub
import google.cloud.pubsub_v1.subscriber.message as Message
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc
class WorkaroundPolicy(thread.Policy):
def on_exception(self, exception):
# If we are seeing UNAVALABLE then we need to retry (so return None)
unavailable = grpc.StatusCode.UNAVAILABLE
if isinstance(exception, ServiceUnavailable):
logger.warning('Ignoring google.api_core.exceptions.ServiceUnavailable exception: {}'.format(exception))
return
elif getattr(exception, 'code', lambda: None)() in [unavailable]:
logger.warning('Ignoring grpc.StatusCode.UNAVAILABLE (Orginal exception: {})'.format(exception))
return
# For anything else fall back on the parent class behaviour
super(WorkaroundPolicy, self).on_exception(exception)
# Other imports and code ommitted for brevity
def callback(msg: Message):
try:
data = json.loads(msg.data)
except ValueError as e:
logger.error('Loading Payload ({}) threw an Exception: {}.'.format(msg.data, e))
# For the prototype, if we can't read it, then discard it
msg.ack()
return
device_project_id = msg.attributes['projectId']
device_registry_id = msg.attributes['deviceRegistryId']
device_id = msg.attributes['deviceId']
device_region = msg.attributes['deviceRegistryLocation']
self._update_device_config(
device_project_id,
device_region,
device_registry_id,
device_id,
data)
msg.ack()
def run(self, project_name, subscription_name):
# Specify a workaround policy to handle StatusCode.UNAVAILABLE [code=8a75] error (but may get CPU issues)
#subscriber = pubsub.SubscriberClient(policy_class = WorkaroundPolicy)
# Alternatively, instantiate subscriber without the workaround to see full exception stack
subscriber = pubsub.SubscriberClient()
subscription = subscriber.subscribe(subscription_path, callback)
subscription.future.result()
while True:
time.sleep(60)
If it helps, the full version of this can be found in GitHub.
Stack trace/command line output (without policy workaround)
(venv) Freds-MBP:iot-subscriber-issue Fred$ python Controller.py \
--project_id=xyz-tests \
--pubsub_subscription=simple-mqtt-controller \
--service_account_json=/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json
2018-03-21 09:36:20,975 INFO Controller Creating credentials from JSON Key File: "/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json"...
2018-03-21 09:36:20,991 INFO Controller Creating service from discovery URL: "https://cloudiot.googleapis.com/$discovery/rest?version=v1"...
2018-03-21 09:36:20,992 INFO googleapiclient.discovery URL being requested: GET https://cloudiot.googleapis.com/$discovery/rest?version=v1
2018-03-21 09:36:21,508 INFO Controller Creating subscriber for project: "xyz-tests" and subscription: "simple-mqtt-controller"...
2018-03-21 09:36:23,200 INFO Controller Listening for messages on projects/xyz-tests/subscriptions/simple-mqtt-controller...
# This then occurs typically 60 seconds or so (sometimes up to 2 mins) later:
Exception in thread Thread-ConsumeBidirectionalStream:
Traceback (most recent call last):
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 76, in next
return six.next(self._wrapped)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 347, in __next__
return self._next()
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 341, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 349, in _blocking_consume
for response in responses:
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 476, in _pausable_iterator
yield next(iterator)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 78, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 363, in _blocking_consume
request_generator, response_generator)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 275, in _stop_request_generator
if not response_generator.done():
AttributeError: '_StreamingResponseIterator' object has no attribute 'done'
^C
Traceback (most recent call last):
File "Controller.py", line 279, in <module>
if __name__ == '__main__':
File "Controller.py", line 274, in main
try:
File "Controller.py", line 196, in run
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 111, in result
err = self.exception(timeout=timeout)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/cloud/pubsub_v1/futures.py", line 133, in exception
if not self._completed.wait(timeout=timeout):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
(venv) Freds-MBP:iot-subscriber-issue Fred$
This seems to be an ongoing problem looking at the following issues in GitHub (all of which are now closed):
Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234
Pub/Sub has no way to track errors from the subscriber thread. #3888
PubSub: set_exception can only be called once.
can still occur "erroneously" #4463
I've also found the followng items on StackOverflow:
Google PubSub python client returning StatusCode.UNAVAILABLE was posted in October 2017 and the answer is the policy class workaround that I have tried in my code above. Although, in my code at least, the answer proposed only hides the error but doesn't allow new messages to be processed.
Google Pubsub Python Client library subscriber crashes randomly seems to be the same cause but the use case is different. The answer (provided by the questioner) suggests that upgrading to the latest google-cloud solves the problem but I am already using the latest versions of google-api-core (1.1.0) and google-cloud-pubsub (0.32.1) etc.
Google Pub/Sub Subscriber not receiving messages after a while may be related but there is no conclusive answer.
Other Info: OS : Mac OS X El Capitan 10.11.6 Python 3.6.2 running in virtualenv 15.1.0
(partial) pip freeze output:
google-api-core==1.1.0
google-api-python-client==1.6.5
google-auth==1.4.1
google-auth-httplib2==0.0.3
google-cloud-pubsub==0.32.1
googleapis-common-protos==1.5.3
grpc-google-iam-v1==0.11.4
grpcio==1.10.0
httplib2==0.10.3
paho-mqtt==1.3.1