0

Usually the jinja template is passed inside double quotes to any operator in Airflow as shown below code,

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2023, 3, 16)
}

with DAG('my_dag', default_args=default_args, schedule_interval=None) as dag:
    
    task1 = DatabricksSubmitRunOperator(
        task_id='task1',
        databricks_conn_id='databricks_default',
        existing_cluster_id="xyz-cluster-id",
        run_name='My Run',
        wait_for_completion=True,
        execution_timeout="{{ ti.xcom_pull(task_ids='previous_task_id', key='my_timeout') }}"
    )

    task2 = ...

    task1 >> task2

But as execution_timeout requires timedelta above code throwing error ValueError: execution_timeout must be timedelta object but passed as type <class 'str'>

So I feel it should be passed as below,

task1 = DatabricksSubmitRunOperator(
        task_id='task1',
        databricks_conn_id='databricks_default',
        existing_cluster_id="xyz-cluster-id",
        run_name='My Run',
        wait_for_completion=True,
        execution_timeout=timedelta(minutes="{{ ti.xcom_pull(task_ids='previous_task_id', key='my_timeout') }}")
    )

But now it's giving error as unsupported type for timedelta minutes component: str

I know that exection_timeout is not templated field so I have already creating a wrapper over DataBricksSubmitRunOperator to allow templating for exection_timeout field

Note: Above code is for reference purpose only, the execution_timeout would applicable for any custom operator

sumitkanoje
  • 1,217
  • 14
  • 22
  • Does this answer your question? [Format jinja template as INT in operator parameter in Airflow](https://stackoverflow.com/questions/64235496/format-jinja-template-as-int-in-operator-parameter-in-airflow) – Kombajn zbożowy Mar 16 '23 at 20:27
  • Nope, I am aware of this thing – sumitkanoje Mar 17 '23 at 06:34
  • And `render_template_as_native_obj=True` on DAG plus `execution_timeout=timedelta(minutes="{{ ti.xcom_pull(task_ids='previous_task_id', key='my_timeout') | int }}")` doesn't work for you? – Kombajn zbożowy Mar 17 '23 at 07:29

1 Answers1

1

Using Jinja templates as arguments can only be done when the field is designated as a template field for a given operator. Unfortunately, parameters that are needed for the Scheduler (effectively parameters originating directly from the BaseOperator) are not templateable. Meaning, templating execution_timeout will not be possible in Airflow.

Below is a list of parameters that will not be able to be templateable in Airflow.

'start_date'
'doc'
'executor_config'
'pre_execute'
'email'
'retry_exponential_backoff'
'depends_on_past'
'inlets'
'wait_for_downstream'
'email_on_failure'
'outlets'
'doc_json'
'execution_timeout'
'task_id'
'sla'
'on_success_callback'
'trigger_rule'
'ignore_first_depends_on_past'
'doc_md'
'pool'
'doc_rst'
'on_failure_callback'
'email_on_retry'
'weight_rule'
'priority_weight'
'pool_slots'
'run_as_user'
'task_group'
'doc_yaml'
'on_retry_callback'
'do_xcom_push'
'max_retry_delay'
'retries'
'owner'
'params'
'default_args'
'queue'
'resources'
'task_concurrency'
'max_active_tis_per_dag'
'wait_for_past_depends_before_skipping'
'retry_delay'
'on_execute_callback'
'dag'
'end_date'
'post_execute'
Josh Fell
  • 2,959
  • 1
  • 4
  • 15