3

I'm trying to format a jinja template parameter as an integer so I can pass it to an operator which expects INT (could be custom or PythonOperator) and I'm not able to.

See sample DAG below. I'm using the built-in Jinja filter | int but that's not working - the type remains <class 'str'>

I'm still new with Airflow but I don't think this is possible based on what I've read about Jinja/Airflow works. I see two main workarounds:

  • Change the operator parameter to expect string and handle the conversion underneath.
  • Handle this conversion in a separate PythonOperator which converts the string to an int and export that using xcom/task context. (I think this will work but not sure)

Please let me know of any other workarounds

def greet(mystr):
    print (mystr)
    print(type(mystr))

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2)        
}

dag = DAG(
    'template_dag',
    default_args=default_args,
    description='template',
    schedule_interval='0 13 * * *'
)


with dag:

    # foo = "{{ var.value.my_custom_var | int }}"  # from variable
    foo = "{{ execution_date.int_timestamp | int }}"  # built in macro

    # could be MyCustomOperator
    opr_greet = PythonOperator(task_id='greet',
                               python_callable=greet,
                               op_kwargs={'mystr': foo}
                               )

    opr_greet

Airflow 1.10.11

Gabe
  • 5,113
  • 11
  • 55
  • 88

2 Answers2

9

Updated answer:

As of Airflow 2.1, you can pass render_template_as_native_obj=True to the dag and Airflow will return the Python type (dict, int, etc) instead of string. See this pull request

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule_interval=None,
    start_date=days_ago(2),
    render_template_as_native_obj=True,
)

Old answer for prior versions:

I found a related question that provides the best workaround, IMO.

Airflow xcom pull only returns string

The trick is to use a PythonOperator, do the datatype conversion there and then call the main operator with the parameter. Below is an example in converting a json string to a dict object. Same can apply for converting string to int, etc.

def my_func(ds, **kwargs):
    ti = kwargs['ti']
    body = ti.xcom_pull(task_ids='privious_task_id')
    import_body= json.loads(body)
    op = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )
    op.execute()
    

p = PythonOperator(task_id='python_task', python_callable=my_func)
Gabe
  • 5,113
  • 11
  • 55
  • 88
2

I believe Jinja is going always to return you a string: the template is a string and replacing values inside the template will return you a string.

If you are sure that foo is always an integer, you can do

opr_greet = PythonOperator(task_id='greet',
                           python_callable=greet,
                           op_kwargs={'mystr': int(foo)}
                           )

Update: it looks like Airflow uses the render method from Jinja2, which returns a Unicode string.

At this point, if you can modify greet, it is easier to manage the input parameter in that function.

mucio
  • 7,014
  • 1
  • 21
  • 33
  • That doesn't work unfortunately. When it parses the dag, it tries to convert the literal string `"{{ ... }}"` to int. – Gabe Oct 07 '20 at 17:38
  • you are right, looks like the jinja2 module returns rendered templates as strings – mucio Oct 07 '20 at 23:12