Usually the jinja template is passed inside double quotes to any operator in Airflow as shown below code,
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 3, 16)
}
with DAG('my_dag', default_args=default_args, schedule_interval=None) as dag:
task1 = DatabricksSubmitRunOperator(
task_id='task1',
databricks_conn_id='databricks_default',
existing_cluster_id="xyz-cluster-id",
run_name='My Run',
wait_for_completion=True,
execution_timeout="{{ ti.xcom_pull(task_ids='previous_task_id', key='my_timeout') }}"
)
task2 = ...
task1 >> task2
But as execution_timeout requires timedelta above code throwing error
ValueError: execution_timeout must be timedelta object but passed as type <class 'str'>
So I feel it should be passed as below,
task1 = DatabricksSubmitRunOperator(
task_id='task1',
databricks_conn_id='databricks_default',
existing_cluster_id="xyz-cluster-id",
run_name='My Run',
wait_for_completion=True,
execution_timeout=timedelta(minutes="{{ ti.xcom_pull(task_ids='previous_task_id', key='my_timeout') }}")
)
But now it's giving error as unsupported type for timedelta minutes component: str
I know that exection_timeout is not templated field so I have already creating a wrapper over DataBricksSubmitRunOperator to allow templating for exection_timeout field
Note: Above code is for reference purpose only, the execution_timeout would applicable for any custom operator