2

I have one topic and one subscription with multiple subscribers. My application scenario is I want to process messages on different subscribers with specific number of messages to be processed at a time. Means at first suppose 8 messages are processing then if one message processing done then after acknowledging processed message next message should take from the topic while taking care of no duplicate message to be found on any subscriber and every time 8 message should processed in the background.

For this I have use synchronous pull method with max_messages = 8 but next pulling is done after all messages process completed. So we have created own scheduler where at same time 8 process should be running at background and pulling 1 message at a time but still after all 8 message processing completed next message is delivered.

Here is my code:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = subscriber.pull(subscription_path, max_messages=1)
                message = response.received_messages[0]
                msg = message
                ack_id = message.ack_id
                process = multiprocessing.Process(target=worker, args=(message,))
                process.start()
                while process.is_alive():
                    # `ack_deadline_seconds` must be between 10 to 600.
                    subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
                    time.sleep(SLEEP_TIME)
                # Final ack.
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info("Acknowledging message: {}".format(msg.message.data))
    except Exception as e:
        print (e)
        continue

    def synchronous_pull():
        p = []
        for i in range(0,NUM_MESSAGES):
            p.append(multiprocessing.Process(target=message_puller))

        for i in range(0,NUM_MESSAGES):
            p[i].start()

        for i in range(0,NUM_MESSAGES):
            p[i].join()

    if __name__ == '__main__':
        synchronous_pull()

Also for sometime subscriber.pull not pulling any messages even the while loop is always True. It gives me error as list index (0) out of range Concluding that subscriber.pull not pulling in message even messages are on the topic but after sometime it starts pulling. Why it is so?

I have tried with asynchronous pulling and flow control but duplicate message are found on multiple subscriber. If any other method will resolve my issue then let mi know. Thanks in advance.

  • Google PubSub ensures each message will be delivered **At-Least Once**. There will be cases where you'll receive the same messages more than once. In this case, your program needs to exhibit idempotency https://stackoverflow.com/questions/1077412/what-is-an-idempotent-operation. – saintlyzero Oct 10 '19 at 12:00

1 Answers1

5

Google Cloud PubSub ensures At least Once (docs). Which means, the messages may be delivered more than once. To tackle this, you need to make your program/system idempotent

You have multiple subscribers pulling 8 messages each.
To avoid the same message getting processed by multiple subscribers, acknowledge the message as soon as any subscriber pulls that message and proceeds further for processing rather than acknowledging it at the end, after the entire processing of the message.

Also, instead of running your main script continuously, use sleep for some constant time when there are no messages in the queue.

I had a similar code, where I used synchronous pull except I did not use parallel processing.

Here's the code:

PubSubHandler - Class to handle Pubsub related operations

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

    def __init__(self, subscriber_config):

        self.project_name = subscriber_config['PROJECT_NAME']
        self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


    def pull_messages(self,number_of_messages):

        try:
            response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
            received_messages = response.received_messages
        except DeadlineExceeded as e:
            received_messages = []
            print('No messages caused error')
        return received_messages


    def ack_messages(self,message_ids):

        if len(message_ids) > 0:
            self.subscriber.acknowledge(self.subscriber_path, message_ids)
            return True

Utils - Class for util methods

import json

class Utils:


    def __init__(self):
        pass


    def decoded_data_to_json(self,decoded_data):
        try:
            decoded_data = decoded_data.replace("'", '"')
            json_data = json.loads(decoded_data)
            return json_data
        except Exception as e:
            raise Exception('error while parsing json')


    def raw_data_to_utf(self,raw_data):
        try:
            decoded_data = raw_data.decode('utf8')
            return decoded_data
        except Exception as e:
            raise Exception('error converting to UTF')

Orcestrator - Main script


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

    def __init__(self):

        self.MAX_NUM_MESSAGES = 2
        self.SLEEP_TIME = 10
        self.util_methods = Utils()
        self.pub_sub_handler = PubSubHandler(subscriber_config)


    def main_handler(self):
        to_ack_ids = []
        pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

        if len(pulled_messages) < 1:
            self.SLEEP_TIME = 1
            print('no messages in queue')
            return

        logging.info('messages in queue')
        self.SLEEP_TIME = 10

        for message in pulled_messages:
            raw_data = message.message.data
            try: 
                decoded_data = self.util_methods.raw_data_to_utf(raw_data)  
                json_data = self.util_methods.decoded_data_to_json(decoded_data)
                print(json_data)

            except Exception as e:
                logging.error(e)
            to_ack_ids.append(message.ack_id)

        if self.pub_sub_handler.ack_messages(to_ack_ids):
            print('acknowledged msg_ids')


if __name__ == "__main__":

    orecestrator = Orcestrator()
    print('Receiving data..')
    while True:
        orecestrator.main_handler()
        time.sleep(orecestrator.SLEEP_TIME)

saintlyzero
  • 1,632
  • 2
  • 18
  • 26
  • Thanks for response. But in my scenario theere are about 100 messages on topic and processing time is about 4 to 5 hrs for each message. 1. If I acknowledge first without processing then next messages will occur at the subscriber also if any process crashes then we want it again to be processed on next or same subscriber. 2. If one message process completed then onwords we have to pull next message. So that ideally 8 messages processing will be running at any time. – Pradnya Shinde Oct 10 '19 at 14:44
  • @PradnyaShinde If you're not acknowledging a message before processing it, then definitely the other subscriber will catch the same message and start processing it. To avoid this, you can do: 1. `Fetch new message` -> 2. `Store the message` (locally or in a db) -> 3. `Ack message` -> 4. `Process message`. Now, if the process fails, recover the stored message and again processes it or publish the same message over the queue so that it may get executed by other subscriber. – saintlyzero Oct 10 '19 at 18:58
  • Totally agree. But as per the doc it is being said that if one subscriber pulls the message then other subscriber won't pull it until and unless its ack_deadline not ended. So that I used modify_ack_deadline concept by checking process running or not. In my code duplicate message entry problem on multiple subscriber resolved but unable to pull next message if one process completed. Also I want to clear for my scenario flow control is suitable or not ? – Pradnya Shinde Oct 11 '19 at 05:27
  • @PradnyaShinde Yeah, extending ack_deadline will halt the resending of the same message. But, I think, there be a limit to further extend the ack_deadline. I am not sure why you are unable to pull the next message after a process is completed. – saintlyzero Oct 12 '19 at 10:16
  • @PradnyaShinde My approach to this problem would be 1. You can use celery, which will initialize a pool of python workers and assign messages to each worker, based on the number of messages in the queue. Celery will take care no message gets unprocessed. – saintlyzero Oct 12 '19 at 10:20
  • @PradnyaShinde 2. Don't do multiprocessing, instead, run multiple instances of your program, that way you can decouple your processing. If the parent process fails it won't fail all its child processes. Also, acknowledge the message as soon as you start processing, store the message in a location on a disk, in case of any failure, reinsert the message into the queue from the disk, also maintain the states of the process in a DB. – saintlyzero Oct 12 '19 at 10:21
  • Thanks for response. On multiple instances i have observed the different behavior of pubsub. Until and unless on one instance message is processing other instance not able to pull next message. How the two instances are depending on each other? Subscription deadline is 10 sec. – Pradnya Shinde Oct 21 '19 at 07:01