5

I am using Cloud Tasks. I need to trigger the execution of Task C only when Task A and Task B have been completed successfully. So I need some way of reading / being notified of the statuses of Tasks triggered. But I see no way of doing this in GCP's documentation. Using Node.js SDK to create tasks and Cloud Functions as task handlers if at all that helps.

Edit:

As requested, here is more info on what we are doing:

Tasks 1 - 10 each make HTTP requests, fetch data, update individual collections in Firestore based on this data. These 10 tasks can run in parallel and in no particular order as they don't have any dependency on each other. All of these tasks are actually implemented inside GCF.

Task 11 actually depends on the Firestore collection data updated by Tasks 1 - 10. So it can only run after Tasks 1 - 10 are completed successfully.

We do issue a RunID as a common identifier to group a particular run of all tasks (1 - 11).

Raj Chaudhary
  • 1,444
  • 3
  • 16
  • 31

2 Answers2

3

Cloud Task only trigger task, you can only define time condition. You have to code manually the check when the task C run.

Here an example of process:

  • Task A is running, at the end, the task write in firestore that is completed
  • Task B is running, at the end, the task write in firestore that is completed
  • Task C start and check if A and B are completed in firestore.
    • If not, the task exit in error
    • Is yes, continue the process

You have to customize your C task queue for retrying the task in case of error.

Another, expensive, solution is to use Cloud Composer for handling this workflow

There is no other solution for now about workflow management.

guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76
  • 2
    If using this approach you could trigger Task C from a Cloud Firestore `onWrite` listener that listens to the location that Tasks A and B write to. The "check" function would run twice for each pair (the first when only A or B have finished, and the second when the last of the two finished - firing Task C). If Tasks A and B have the concept of a "job ID", you can write to the same document `/async/job10` and use an `onUpdate` listener which will only fire when the document was changed by the longer running task. – samthecodingman Apr 16 '20 at 12:17
  • 1
    Yes, it's a good alternative. All depends of your context, the number of task to synchronize and the number of task in parallel. But yes, great Idea! – guillaume blaquiere Apr 16 '20 at 14:35
1

Cloud Tasks is not the tool you want to use in this case. Take a look into Cloud Composer which is built in top of Apache Airflow for GCP.

Edit: You could create a GCF to handle the states of those requests

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

################ TASK A
taskA_list = [
    "https://via.placeholder.com/400",
    "https://via.placeholder.com/410",
    "https://via.placeholder.com/420",
    "https://via.placeholder.com/430",
    "https://via.placeholder.com/440",
    "https://via.placeholder.com/450",
    "https://via.placeholder.com/460",
    "https://via.placeholder.com/470",
    "https://via.placeholder.com/480",
    "https://via.placeholder.com/490",
]

def call2TaskA(url):
    html = requests.get(url, stream=True)
    return (url,html.status_code)


processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in taskA_list:
        processes.append(executor.submit(call2TaskA, url))

isOkayToDoTaskB = True
for taskA in as_completed(processes):
    result = taskA.result()
    if result[1] != 200: # your validation on taskA
        isOkayToDoTaskB = False
    results.append(result)

if not isOkayToDoTaskB:
    raise ValueError('Problems: {}'.format(results))

################ TASK B
def doTaskB():
    pass

doTaskB()
Juancki
  • 1,793
  • 1
  • 14
  • 21