Consider the following example of a DAG where the first task, get_id_creds
, extracts a list of credentials from a database. This operation tells me what users in my database I am able to run further data preprocessing on and it writes those ids to the file /tmp/ids.txt
. I then scan those ids into my DAG and use them to generate a list of upload_transaction
tasks that can be run in parallel.
My question is: Is there a more idiomatically correct, dynamic way to do this using airflow? What I have here feels clumsy and brittle. How can I directly pass a list of valid IDs from one process to that defines the subsequent downstream processes?
from datetime import datetime, timedelta
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in uids:
upload_transactions = PythonOperator(
task_id=uid,
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_downstream(get_id_creds)