Currently I have some DAGs scheduled each 15 minutes that call only a DataflowTemplateOperator. I create multiple DAGs in a loop using a "new_dag" function:
def new_dag(job):
default_args = {
'start_date': datetime(2020, 3, 11),
'retries': 0,
'dataflow_default_options': {
'project': config.PROJECT,
'region': config.REGION,
'zone': config.ZONE,
'tempLocation': config.GS_TEMP_LOCATION,
}
}
dag = DAG(dag_id=job,
schedule_interval=config.MIN15_CRON,
catchup=False,
default_args=default_args)
with dag:
quarter_start, quarter_end = get_last_quarter()
dataflow_batch = dataflow_operator.DataflowTemplateOperator(
task_id=job,
template=config.GS_TEMPLATE_LOCATION,
parameters={"startTs": quarter_start, "endTs": quarter_end}
return dag
My custom function "get_last_quarter" computes two timestamps starting from datetime.now(). For example, if now() gives "2020-03-16 18:33:00" my function will produce a time interval for the previous quarter of an hour:
- 2020-03-16 18:15:00
- 2020-03-16 18:30:00
This code works, but I would use "execution_date" or "ts" reading a timestamp from Airflow macros, instead of datetime.now().
How I can do it? I can read macros as a python variable in "with dag" but outside an operator? https://airflow.apache.org/docs/stable/macros.html