I know that the question has been asked before but none of the answers have answered it. I'm beginning to go slightly crazy! I'm very confused so would really appreciate some help.
I've got a DAG with a python operator which runs a SQL query and outputs to .csv. A second operator just returns true in order to make a DAG. I can't seem to access the ds variable within my function. I want to do this in order to pass to the query.
from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json
sql_path = Variable.get("sql_path")
date = Variable.get("ds")
first_date = Variable.get("ds")
print date
def get_redshift_data(ds,**kwargs):
pg_hook = PostgresHook(postgres_conn_id='redshift')
params = {'window_start_date':date,'window_end_date':first_date}
with open(sql_path+"/native.sql") as f:
sql_file = f.read() % (params)
df2 = pg_hook.get_pandas_df(sql_file)
df2.to_csv("test_1.csv", encoding = "utf-8")
def print_test(ds, **kwargs):
return True
args = {
'owner': 'Bob',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
#Define DAG
dag = DAG(dag_id='native_etl',
default_args=args,
schedule_interval='0 * * * 1,2,3,4,5',
dagrun_timeout=timedelta(seconds=30))
#Task 1 run native query with date parameters and output to file
get_redshift_native = PythonOperator(
task_id='native_etl',
provide_context=True,
python_callable=get_redshift_data,
dag=dag
)
#Task 2 print test
get_test = PythonOperator(
task_id='native_test',
provide_context=True,
python_callable=print_test,
dag=dag
)
get_redshift_native >> get_test
if __name__ == "__main__":
dag.cli()
When I look at the logs I get the following,
raise KeyError('Variable {} does not exist'.format(key))`
I've also tried to access the variable via kwargs["ds"] and {{ ds }} within and outside of the operator.
The query is fine and contains the templated text:
WHERE trunc(pd.server_timestamp) between '%(window_start_date)s' AND '%(window_end_date)s'