0

I know that the question has been asked before but none of the answers have answered it. I'm beginning to go slightly crazy! I'm very confused so would really appreciate some help.

I've got a DAG with a python operator which runs a SQL query and outputs to .csv. A second operator just returns true in order to make a DAG. I can't seem to access the ds variable within my function. I want to do this in order to pass to the query.

from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json


sql_path = Variable.get("sql_path")
date = Variable.get("ds")
first_date = Variable.get("ds")

print date

def get_redshift_data(ds,**kwargs):
    pg_hook = PostgresHook(postgres_conn_id='redshift')
    params = {'window_start_date':date,'window_end_date':first_date}
    with open(sql_path+"/native.sql") as f:
        sql_file = f.read() % (params)
    df2 = pg_hook.get_pandas_df(sql_file)
    df2.to_csv("test_1.csv", encoding = "utf-8")

def print_test(ds, **kwargs):
    return True

args = {
    'owner': 'Bob',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#Define DAG
dag = DAG(dag_id='native_etl',
          default_args=args,
          schedule_interval='0 * * * 1,2,3,4,5',
          dagrun_timeout=timedelta(seconds=30))

#Task 1 run native query with date parameters and output to file


get_redshift_native = PythonOperator(
                      task_id='native_etl',
                      provide_context=True,
                      python_callable=get_redshift_data,
                      dag=dag
                      )

#Task 2 print test

get_test = PythonOperator(
                      task_id='native_test',
                      provide_context=True,
                      python_callable=print_test,
                      dag=dag
)

get_redshift_native >> get_test

if __name__ == "__main__":
    dag.cli()

When I look at the logs I get the following,

raise KeyError('Variable {} does not exist'.format(key))`

I've also tried to access the variable via kwargs["ds"] and {{ ds }} within and outside of the operator.

The query is fine and contains the templated text:

WHERE trunc(pd.server_timestamp) between '%(window_start_date)s' AND '%(window_end_date)s'
bamdan
  • 836
  • 7
  • 21
  • 1
    Use this approach to push the context into the Operator and you are done: https://stackoverflow.com/questions/50093718/airflow-python-script-with-execution-date-in-op-kwargs/50095130#50095130 – tobi6 Aug 24 '18 at 06:45

1 Answers1

10

You should use a template_dict to pass in the ds template in your PythonOperator.

https://github.com/apache/incubator-airflow/blob/master/airflow/operators/python_operator.py#L56

For example, if I wanted to pass execution_date to a PythonOperator:

def transform_py(**kwargs):

    today = kwargs.get('templates_dict').get('today', None)
    ...

with dag:

    today = "{{ ds_nodash }}"

    transform = PythonOperator(
            task_id='test_date',
            python_callable=transform_py,
            templates_dict={
                'today': today,

            },
            provide_context=True)
cmaher
  • 5,100
  • 1
  • 22
  • 34
Viraj Parekh
  • 1,351
  • 6
  • 14
  • 1
    Thanks very much. This helped me understand where I was going wrong. In the end I used the solution outlined by @WillFitzgerald here https://stackoverflow.com/questions/41749974/airflow-using-template-files-for-pythonoperatorIt really isn't very clear in what context the variables are available and that some operators can access the variables readily (postgresOperator) whereas others cannot as in this case. – bamdan Aug 24 '18 at 09:57