I'm running this DAG. It import functions from dash_workers.py
(not included, yet--would this be helpful?) and implements those functions as tasks defined by PythonOperator
. I am using airflow version 1.8.0:
from datetime import datetime, timedelta
import os
import sys
import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 7, 18),
'schedule_interval': None
}
DAG = af_models.DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in ids:
print('Building transactions for {}'.format(uid))
upload_transactions = PythonOperator(
task_id='upload_transactions',
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_upstream(get_id_creds)
This results in:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth on3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 675, in _load
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in <module>
upload_transactions.set_upstream(get_id_creds)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2478, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2458, in _set_relatives
task.append_only_new(task._downstream_task_ids, self.task_id)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2419, in append_only_new
''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 573, in test
dag = dag or get_dag(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 126, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: dash_preproc. Either the dag did not exist or it failed to parse.
The application here is that I am extracting a list of IDs from a SQL table using the function get_id_creds
and then generating detailed data profiles on a per-ID basis. Both functions use MySqlHook
internally and I've tested each function/task on a standalone basis to make sure that they result in the expected behavior in isolation (they do).
The crux of the error seems to be the line airflow.exceptions.AirflowException: Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions already registered
. This seems to suggest that on the first pass through the loop the task is "registered" and then on the second pass the parser is complaining that it's already done that operation. This example script makes it look easy to do what I'm doing here: just embed your downstream task within a for
loop. No idea why this is failing.
I'm set of for local parallelism with LocalExecutor
. My understanding is that if I can get this working I can run multiple data profile generation jobs in parallel on the same machine.
Where is this error coming from and how can I get this script working?