1

I have 3 GCP cloud functions written in python namely CF1, CF2, CF3. CF1 will check certain conditions and accordingly it should do parallel execution of CF2 and CF3.

I have tried

if condition is true:
    requests.get("url of CF2")
    print("CF2 executed successfully")
    requests.get("url of CF3")
    print("CF3 executed successfully")

CF1 code:

import requests

static_query = "select * from `myproject.mydataset.mytable`"
    try:

        # Executing query and loading data into temporary table.
        client = bigquery.Client()
        job_config = bigquery.QueryJobConfig()
        dest_dataset = client.dataset(temporary_dataset, temporary_project)
        dest_table = dest_dataset.table(temporary_table)
        job_config.destination = dest_table
        job_config.create_disposition = 'CREATE_IF_NEEDED'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = client.query(static_query, location=bq_location, job_config=job_config)
        query_job.result()
        table = client.get_table(dest_table)
        expiration = (datetime.now() + timedelta(minutes=expiration_time))
        table.expires = expiration
        table = client.update_table(table, ["expires"])
        logging.info("Query result loaded into temporary table: {}".format(temporary_table))

        # Check row count of resultant query from temporary table.
        count_query = "select count(*) size from `{}.{}.{}`".format(temporary_project, temporary_dataset,
                                                                    temporary_table)
        job = client.query(count_query)
        results = job.result()
        count = 0
        for row in results:
            count = row.size

        # If row count of query result is empty log error message on stack-driver.
        if count == 0:
            logging.error("Query executed with empty result set.")

        # If row count of query result has records then trigger below two cloud functions (this should be parallel execution).
        else:
            # Trigger CF2 cloud function.
            requests.get("{}".format(cf2_endpoint))
            logging.info("CF2 executed successfully.")

            # Trigger CF3 cloud function.
            requests.get("{}".format(cf3_endpoint))
            logging.info("CF3 executed successfully.")
    except RuntimeError:
        logging.error("Exception occurred {}".format(error_log_client.report_exception()))

Here I want to execute CF2 and CF3 asynchronously. For any suggestion and solution Thank you in advance.

Kaustubh Ghole
  • 537
  • 1
  • 10
  • 25

2 Answers2

1

The best service for performing asynchronous call is PubSub. For this you have to:

  • Create a PubSub topic
  • Deploy the CF2 with a trigger on PubSub event on the previously created topic
  • Deploy the CF3 with a trigger on PubSub event on the previously created topic
  • Function CF1 create a PubSub message with or without parameters and published it
  • Function CF2 and CF3 are trigger in parallel with the message published in the PubSub. They extracts the parameters from it and do the process

If CF2 and CF3 already exists and are triggered by HTTP call, you can set up a HTTP push subscription on PubSub Topic.

If the CF2 or CF3 fail, the message is resent to the function until a valid acknowledgement (2XX HTTP response) or the message TTL (7 days per default).

By the way, you are not coupled, you are scalable, you are parallel and the CF in error are retried.

guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76
1

if you need to do asynchronous requests and you are using Python you can try with aiohttp or asyncio libraries, there is one example here. Also, you can check Cloud Pub/Sub or Cloud Cloud Tasks.

Harif Velarde
  • 733
  • 5
  • 10