4

I already have a working pipeline in Data Fusion that makes all ETL proccess but I need it to run only when it finds a file called SUCCESS.txt located in a Cloud Storage bucket.
Is this even possible?
On other platforms I used a file watcher (Every minute runs a job to verify if the file I specified exists on certain locatoin, if the file is there, it execute other jobs) but I can't find something similar.
Thanks a lot in advance!

1 Answers1

0

You can achieve this by using Cloud Functions GCS triggers with a condition to call the Data Fusion API to start your pipeline only when the uploaded file is SUCCESS.txt.
Note that whether it calls the Data Fusion API or not, the function will trigger on every file upload.

When you create the Cloud Function:

1. Choose the Cloud Storage trigger type and the Finalize/Create event type.

enter image description here

2. Add the environment variables with your own values and click next.

enter image description here

3. Set the runtime to python 3.7, the name of the python function in the entry point (In this case ,run_pipeline) and add your python script (or the example below) in main.py.

enter image description here

import requests
import json
import os

def get_access_token():

    # scope of the API access. Limit it to just cloud platform services
    scopes='https://www.googleapis.com/auth/cloud-platform'
    headers={'Metadata-Flavor': 'Google'}

    # add the scopes
    api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes

    # api call to get the access token
    r = requests.get(api,headers=headers).json()

    # return the access token
    return r['access_token']

def run_pipeline(data, context):
    '''
    Calls the Data Fusion API to start the pipeline
    '''
    
    # get environmental variables set in the inital configuraiton.
    PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
    TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
    PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
    INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
    REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
    NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
    CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')

    # get uploaded file name
    file_name = data['name']
    
    # get access token
    auth_token=get_access_token()
    
    # api call full endpoint
    post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
    
    # If the pipeline has any macros that need to be set, you can pass them in as a payload
    data = '{"my-file":' + file_name +'}'
    
    # add bearer token to the header
    post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
    
    # condition to start the job:
    if file_name == 'SUCCESS.txt':
        # start the job
        r1 = requests.post(post_endpoint,data=data,headers=post_headers)

4. Deploy your function and when it is ready, test it by uploading your SUCCESS.txt file or any other file.

I have tested it and it works fine (Based on this post).

Ksign
  • 779
  • 5
  • 11
  • Thanks for your answer!!
    Unfortunately when trying to implement it I found the organization doesn´t like Cloud Function and its disabled but I'll try a work around.
    – Edgar Guzmán Dec 23 '21 at 21:16
  • Independently of your organization's policy, if you believe this is a good answer to your question, please consider accepting it so other users may find it easier if they have the same issue. – Ksign Dec 24 '21 at 08:50
  • A possible workaround would be to use [Pub/Sub notifications for Cloud Storage](https://cloud.google.com/storage/docs/pubsub-notifications) with a [notification polling script](https://github.com/googleapis/python-storage/blob/main/samples/snippets/notification_polling.py) but you would need to have it running all the time (unless you have a specific time range when the file is dropped in GCS). – Ksign Dec 24 '21 at 15:58