Updating conf after a dag run has been created isn't as straight forward as reading from conf, because conf is read from the dag_run metadata table whenever it's used after a dag run has been created. While Variables have methods to both write to and read from a metadata table, dag runs only let you read.
I agree that Variables are a useful tool, but when you have k=v pairs that you only want to use for a single run, it gets complicated and messy.
Below is an operator that will let you update a dag_run's conf after instantiation (tested in v1.10.10):
#! /usr/bin/env python3
"""Operator to overwrite a dag run's conf after creation."""
import os
from airflow.models import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars
class UpdateConfOperator(BaseOperator):
"""Updates an existing DagRun's conf with `given_conf`.
Args:
given_conf: A dictionary of k:v values to update a DagRun's conf with. Templated.
replace: Whether or not `given_conf` should replace conf (True)
or be used to update the existing conf (False).
Defaults to True.
"""
template_fields = ("given_conf",)
ui_color = "#ffefeb"
@apply_defaults
def __init__(self, given_conf: Dict, replace: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.given_conf = given_conf
self.replace = replace
@staticmethod
def update_conf(given_conf: Dict, replace: bool = True, **context) -> None:
@provide_session
def save_to_db(dag_run, session):
session.add(dag_run)
session.commit()
dag_run.refresh_from_db()
dag_run = context["dag_run"]
# When there's no conf provided,
# conf will be None if scheduled or {} if manually triggered
if replace or not dag_run.conf:
dag_run.conf = given_conf
elif dag_run.conf:
# Note: dag_run.conf.update(given_conf) doesn't work
dag_run.conf = {**dag_run.conf, **given_conf}
save_to_db(dag_run)
def execute(self, context):
# Export context to make it available for callables to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.debug(
"Exporting the following env vars:\n%s",
"\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
)
os.environ.update(airflow_context_vars)
self.update_conf(given_conf=self.given_conf, replace=self.replace, **context)
Example usage:
CONF = {"field1": 3, "field2": 4}
with DAG(
"some_dag",
# schedule_interval="*/1 * * * *",
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
t_update_conf = UpdateConfOperator(
task_id="update_conf", given_conf=CONF,
)
t_print_conf = BashOperator(
task_id="print_conf",
bash_command="echo {{ dag_run['conf'] }}",
)
t_update_conf >> t_print_conf