1

Currently I have some DAGs scheduled each 15 minutes that call only a DataflowTemplateOperator. I create multiple DAGs in a loop using a "new_dag" function:

def new_dag(job):
  default_args = {
    'start_date': datetime(2020, 3, 11),
    'retries': 0,
    'dataflow_default_options': {
        'project': config.PROJECT,
        'region': config.REGION,
        'zone': config.ZONE,
        'tempLocation': config.GS_TEMP_LOCATION,
    }
  }

  dag = DAG(dag_id=job,
          schedule_interval=config.MIN15_CRON,
          catchup=False,
          default_args=default_args)

  with dag:
    quarter_start, quarter_end = get_last_quarter()
    dataflow_batch = dataflow_operator.DataflowTemplateOperator(
        task_id=job,
        template=config.GS_TEMPLATE_LOCATION,
        parameters={"startTs": quarter_start, "endTs": quarter_end}

  return dag

My custom function "get_last_quarter" computes two timestamps starting from datetime.now(). For example, if now() gives "2020-03-16 18:33:00" my function will produce a time interval for the previous quarter of an hour:

  • 2020-03-16 18:15:00
  • 2020-03-16 18:30:00

This code works, but I would use "execution_date" or "ts" reading a timestamp from Airflow macros, instead of datetime.now().

How I can do it? I can read macros as a python variable in "with dag" but outside an operator? https://airflow.apache.org/docs/stable/macros.html

Diego
  • 51
  • 4
  • Are you using Cloud Composer or AirFlow On Premises? – rmesteves Mar 17 '20 at 08:26
  • I'm using Cloud Composer – Diego Mar 17 '20 at 08:29
  • Did you take a look in this answer? Please let me know if its related to your problem. https://stackoverflow.com/questions/43149276/accessing-the-ds-variable-in-airflow – rmesteves Mar 17 '20 at 08:47
  • Yes! But I tried with no luck. In fact "ds and macros variables can only be accessed through template as they only exists during execution and not during python code parsing" So maybe I must evaluate also different solutions and approaches. – Diego Mar 17 '20 at 09:01
  • Is accessing it through a template not an option to you? – rmesteves Mar 17 '20 at 09:10
  • I just tried it using PythonOperator before my DataflowTemplateOperator, but something is not working. kwargs passed to my function seem to be empty also if I use "provide_context=True". Field not found "kwargs['execution_date']" – Diego Mar 17 '20 at 09:25
  • This other question seems to be related to your problem too https://stackoverflow.com/questions/46808531/how-can-i-get-execution-date-in-dag-the-outside-of-operator – rmesteves Mar 17 '20 at 09:37

1 Answers1

4

I found a solution adding a new operator PythonOperator:

    get_time = PythonOperator(
        task_id="get_time",
        python_callable= get_last_quarter,
        provide_context=True
    )

    dataflow_batch = dataflow_operator.DataflowTemplateOperator
        task_id=job,
        template=config.GS_TEMPLATE_LOCATION,
        parameters={"inputStartTs": '{{ ti.xcom_pull("get_time")[0] }}',
                    "inputEndTs": '{{ ti.xcom_pull("get_time")[1] }}'}
    )

To pass params between operators can be used xcom with "ti" key (default key). In python function "get_last_quarter" it's possible to return simply a value, in my case two values in a tuple.

Diego
  • 51
  • 4