I have a DAG that uses KubernetesPodOperator
and the task get_train_test_model_task_count
in the DAG pushes an xcom variable and I want to use it in the following tasks.
run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ ti.xcom_pull(task_ids=\"get_train_test_model_task_count\", key=\"return_value\")[\"models_count\"] }}"',
)
The above DAG task works and it prints the value as ti_key=24
.
I want the same value to be used as a variable,
with TaskGroup("train_test_model_config") as train_test_model_config:
models_count = "{{ ti.xcom_pull(task_ids=\"get_train_test_model_task_count\", key=\"return_value\")[\"models_count\"] }}"
print(models_count)
for task_num in range(0, int(models_count)):
generate_train_test_model_config_task(task_num)
int(models_count)
doesnot work, by throwing the error -
ValueError: invalid literal for int() with base 10: '{{ ti.xcom_pull(task_ids="get_train_test_model_task_count", key="return_value")["models_count"] }}'
And the generate_train_test_model_config_task
looks as below:
def generate_train_test_model_config_task(task_num):
task = KubernetesPodOperator(
name=f"train_test_model_config_{task_num}",
image=build_model_image,
labels=labels,
cmds=[
"python3",
"-m",
"src.models.train_test_model_config",
"--tenant=neu",
f"--model_tag_id={task_num}",
"--line_plan={{ ti.xcom_pull(key=\"file_name\", task_ids=\"extract_file_name\") }}",
"--staging_bucket=cs-us-ds"
],
task_id=f"train_test_model_config_{task_num}",
do_xcom_push=False,
namespace="airflow",
service_account_name="airflow-worker",
get_logs=True,
startup_timeout_seconds=300,
container_resources={"request_memory": "29G", "request_cpu": "7000m"},
node_selector={"cloud.google.com/gke-nodepool": NODE_POOL},
tolerations=[
{
"key": NODE_POOL,
"operator": "Equal",
"value": "true",
"effect": "NoSchedule",
}
],
)
return task