1

I am using Google Pub/Sub client v2.2.0 in python3.6 as a Subscriber.

I want my application to shutdown gracefully after acking all the messages it already received.

Sample code for a subscriber from Google's guide with minor changes that will show my issue:

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from time import sleep

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message}.")
    sleep(30)
    message.ack()
    print("Acked")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    sleep(10)
    streaming_pull_future.cancel()
    streaming_pull_future.result()

From https://cloud.google.com/pubsub/docs/pull

I expect this code to stop pulling messages and finish the running messages and then exits.

Actually this code stops pulling messages and finish executing the running messages but it does not ack the messages. The .ack() happens but the server does not receive the ack, so next run the same messages return again.

1. Why doesn't the server receives the ack?

2. How to gracefully shutdown the subscriber?

3. What is the expected behavior of .cancel()?

Montoya
  • 2,819
  • 3
  • 37
  • 65
  • I had a look on the library, the stop process (cancel) wait the end of all the thread. I though to something else: What's your ack deadline of your subscription? – guillaume blaquiere Jan 17 '21 at 16:58
  • @guillaumeblaquiere My ack deadline is the default which is 600 seconds – Montoya Jan 18 '21 at 09:24
  • @JohnHanley even with 60 seconds sleep the ack still not happening. – Montoya Jan 18 '21 at 09:46
  • The SIGTERM is happening in a much more complicated code, so i made a simple example without it. – Montoya Jan 18 '21 at 09:54
  • In my real application I am using sigterm handler to call .cancel(). Here with a much simpler code the without sigterm (handling or calling) I am observing the same behaviour of messages that are not being acked after the cancel. Writing the sigterm in the question is confusing I will remove it. – Montoya Jan 18 '21 at 10:14
  • I did understand what you were saying. How are you suggesting to wait for this background thread to ack the messages? Sleeping did not work for me. – Montoya Jan 18 '21 at 10:17

1 Answers1

6

Update (v2.4.0+)

The client version 2.4.0 added a new optional parameter await_msg_callbacks to the streaming pull future's cancel() method. If set to True, the method will block until all currently executing message callbacks are done and the background message stream has been shut down (the default is False).

try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel(await_msg_callbacks=True)  # blocks until done

A couple of release notes:

  • Waiting on callbacks means that any message ACKs generated in them will still be processed (read: sent to the backend).
  • If await_msg_callbacks is False or not given, the shutdown will proceed without waiting. The callbacks might still be running in the background after cancel() returns, but any ACKs generated by them will have no effect, as there won't be any thread running anymore to dispatch the ACK requests to the backend.
  • Messages sitting in the client's internal queues are now automatically NACK-ed on shutdown. This happens regardless of the await_msg_callbacks value.

Original answer (v2.3.0 and below)

The streaming pull is managed in the background by a streaming pull manager. When the streaming pull future is canceled, it invokes the manager's close() method that gracefully shuts down the background helper threads.

One of the things that are shut down is the scheduler - it's a thread pool that is used to asynchronously dispatch received messages to the user callback. The key thing to note is that scheduler.shutdown() does not wait for the user callbacks to complete, as it could potentially block "forever", but instead empties the executor's work queue and shuts the latter down:

def shutdown(self):
    """Shuts down the scheduler and immediately end all pending callbacks.
    """
    # Drop all pending item from the executor. Without this, the executor
    # will block until all pending items are complete, which is
    # undesirable.
    try:
        while True:
            self._executor._work_queue.get(block=False)
    except queue.Empty:
        pass
    self._executor.shutdown()

This explains why ACKs are not sent in the provided code sample - the callbacks sleep for 30 seconds, while the streaming pull future is canceled only after approximately 10 seconds. The ACKs are not sent to the server.

Misc. remarks

  • Since streaming pull is a long running operation, we want to block in the main thread in order to not exit prematurely. This is done by blocking on the streaming pull future result:
try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

Or after a pre-set timeout:

try:
    streaming_pull_future.result(timeout=123)
except concurrent.futures.TimeoutError:
    streaming_pull_future.cancel()
  • ACK requests are best-effort. Even if the shutdown blocked and waited for the user callbacks to complete, there would still be no guarantee that the messages would actually get akcnowledged (requests can get lost in the network, for example).

  • Re: the concern about re-deliver messages ("so next run the same messages return again") - this is actually by design. The backend will try hard to deliver each message at least once, since requests may get lost. This includes ACK requests from the subscribers, thus subscriber application must be designed with idempotence in mind.

plamut
  • 3,085
  • 10
  • 29
  • 40
  • If I do want to wait for pending task to complete before closing the connection, can I do it? it seems like wait=False/True flag is appropriate here. Because in my scenario I wish to stop receiving messages and process the messages that were already received. @plamut – Montoya Jan 18 '21 at 12:58
  • Perhaps by overriding the scheduler that is passed in, but out of the box the answer is probably no, I'm afraid. On the other hand, there have been similar [feature requests](https://github.com/googleapis/python-pubsub/issues/150) in the past, and if there's more demand it might actually get added to the roadmap. – plamut Jan 18 '21 at 13:18
  • I am able to fix this in my code by waiting to threads that started to process messages. New threads are not starting if a shutdown has started. After I am sure that there are no running threads executing messages I nack all messages that has not yet been started and call .cancel(). If this will be implemented in the library level this surely be nice. – Montoya Jan 18 '21 at 15:19
  • I agree, Montoya, that it would be useful to fix this behavior. As it stands, even an ack() may be dropped before it goes to the server if the subscriber is shut down before the ack message has a chance to be sent out. We could provide users with a future for the ack() as well, but it maybe be too complex. In general, the library is optimized for sustained throughput and doesn't do as well with processing a few messages and shutting down. With millions of messages, a few lost acks is not a big deal, but it is a bigger problem at smaller volumes. – Prad Nelluru Jan 19 '21 at 17:47
  • 1
    @Montoya The newest version `v2.4.0` released yesterday adds exactly that - auto NACKs and optionally blocking until callbacks are done executing. See the updated answer for details. – plamut Feb 23 '21 at 11:59