2

I'm running this DAG. It import functions from dash_workers.py (not included, yet--would this be helpful?) and implements those functions as tasks defined by PythonOperator. I am using airflow version 1.8.0:

from datetime import datetime, timedelta
import os
import sys

import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime(2017, 7, 18),
  'schedule_interval': None
}

DAG = af_models.DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in ids:
    print('Building transactions for {}'.format(uid))
    upload_transactions = PythonOperator(
        task_id='upload_transactions',
        python_callable=dash_workers.upload_transactions, 
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_upstream(get_id_creds)

This results in:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File   "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth on3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 675, in _load
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
  File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in  <module>
    upload_transactions.set_upstream(get_id_creds)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2478, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2458, in _set_relatives
    task.append_only_new(task._downstream_task_ids, self.task_id)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2419, in append_only_new
    ''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):   get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 573, in test
    dag = dag or get_dag(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 126, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:    dash_preproc. Either the dag did not exist or it failed to parse.

The application here is that I am extracting a list of IDs from a SQL table using the function get_id_creds and then generating detailed data profiles on a per-ID basis. Both functions use MySqlHook internally and I've tested each function/task on a standalone basis to make sure that they result in the expected behavior in isolation (they do).

The crux of the error seems to be the line airflow.exceptions.AirflowException: Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions already registered. This seems to suggest that on the first pass through the loop the task is "registered" and then on the second pass the parser is complaining that it's already done that operation. This example script makes it look easy to do what I'm doing here: just embed your downstream task within a for loop. No idea why this is failing.

I'm set of for local parallelism with LocalExecutor. My understanding is that if I can get this working I can run multiple data profile generation jobs in parallel on the same machine.

Where is this error coming from and how can I get this script working?

aaron
  • 6,339
  • 12
  • 54
  • 80

1 Answers1

4

Not directly related to your problem, but you don't need to import airflow.models in your case just do from airflow.models import DAG and do the necessary changes.

You pointed to an example which shows a DAG with PythonOperator generating tasks dynamically, but you seem that you didn't quite understood it.

In your case you must assign tasks names dynamically, so that each new task can be registered and shown in the webserver.

for idx, uid in enumerate(ids):
    print('Building transactions for {}'.format(uid))
    upload_transactions = PythonOperator(
        task_id='upload_transactions_'+str(idx),
        python_callable=dash_workers.upload_transactions, 
        op_args=[uid],
        dag=DAG)

By adding the index of the current uid in the task name each task will get a unique name. I did not use the uid for this as i don't know if each element is unique in your list. If so you can remove the enumerate() and use uid.

I hope this would help. Cheers!

sdikby
  • 1,383
  • 14
  • 30
  • that's super-helpful, thanks! this is almost certainly the issue. I will implement the solution when I'm back in the office and return to select your answer if that fixes the problem. – aaron Jul 24 '17 at 21:49
  • (I will also edit the post to include the airflow version that I am using) – aaron Jul 24 '17 at 21:51
  • That fixed my graph, @sdikby, thanks for the help. I voted for your answer and included the Airflow version number above (1.8.0). If you're interested [I've posted another question here about exchanging data between tasks](https://stackoverflow.com/questions/45314174/how-to-dynamically-iterate-over-the-output-of-an-upstream-task-to-create-paralle). – aaron Jul 25 '17 at 21:58