I already have a working pipeline in Data Fusion that makes all ETL proccess but I need it to run only when it finds a file called SUCCESS.txt located in a Cloud Storage bucket.
Is this even possible?
On other platforms I used a file watcher (Every minute runs a job to verify if the file I specified exists on certain locatoin, if the file is there, it execute other jobs) but I can't find something similar.
Thanks a lot in advance!
Asked
Active
Viewed 764 times
4

Edgar Guzmán
- 43
- 2
1 Answers
0
You can achieve this by using Cloud Functions GCS triggers with a condition to call the Data Fusion API to start your pipeline only when the uploaded file is SUCCESS.txt
.
Note that whether it calls the Data Fusion API or not, the function will trigger on every file upload.
When you create the Cloud Function:
1. Choose the Cloud Storage trigger type and the Finalize/Create event type.
2. Add the environment variables with your own values and click next.
3. Set the runtime to python 3.7, the name of the python function in the entry point (In this case ,run_pipeline
) and add your python script (or the example below) in main.py
.
import requests
import json
import os
def get_access_token():
# scope of the API access. Limit it to just cloud platform services
scopes='https://www.googleapis.com/auth/cloud-platform'
headers={'Metadata-Flavor': 'Google'}
# add the scopes
api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
# api call to get the access token
r = requests.get(api,headers=headers).json()
# return the access token
return r['access_token']
def run_pipeline(data, context):
'''
Calls the Data Fusion API to start the pipeline
'''
# get environmental variables set in the inital configuraiton.
PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
# get uploaded file name
file_name = data['name']
# get access token
auth_token=get_access_token()
# api call full endpoint
post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
# If the pipeline has any macros that need to be set, you can pass them in as a payload
data = '{"my-file":' + file_name +'}'
# add bearer token to the header
post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
# condition to start the job:
if file_name == 'SUCCESS.txt':
# start the job
r1 = requests.post(post_endpoint,data=data,headers=post_headers)
4. Deploy your function and when it is ready, test it by uploading your SUCCESS.txt
file or any other file.
I have tested it and it works fine (Based on this post).

Ksign
- 779
- 5
- 11
Unfortunately when trying to implement it I found the organization doesn´t like Cloud Function and its disabled but I'll try a work around. – Edgar Guzmán Dec 23 '21 at 21:16