1

I need to access XCOM values outside the operator, it can be done using variables, but i want to avoid it.

I am pushing the xcom values from one operator and then a funcction to fetch xcom values using getone/getmany function, but it is returning null/none.

even chatgpt is not providing correct result.

please have look at the below code and help fix it?


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime
from airflow.operators.bash import BashOperator
from airflow.models.xcom import XCom
from airflow.utils.session import provide_session

args = {
    'owner': 'airflow',
    'provide_context': True
}


from airflow.models import DagRun, Variable
from airflow.utils.dates import days_ago


def last_task_func(**kwargs):
    value1 = kwargs["value1"]
    value2 = kwargs["value2"]
    value3 = kwargs["value3"]
    value4 = kwargs["value4"]
    value5 = kwargs["value5"]
    final_value=f"{value1}-{value2}-{value3}-{value4}-{value5}"
    print(f"value -- {final_value}")
    return final_value


def my_previous_task_func(**kwargs):
    ti = kwargs["ti"]
    ti.xcom_push(key="key1", value='xcom_value 1')
    ti.xcom_push(key="key2", value='xcom_value 2')
    ti.xcom_push(key="key3", value='xcom_value 1')


dag_id = "test_xcom_dag"


@provide_session
def get_xcom_value(session):
    # Get the current DAG run ID from the Variable store
    current_dag_run_id = "{{run_id}}"
    # Variable.get("dag_run").get(dag_id)

    # Get the current DAG run object
    # current_dag_run = DagRun.find(dag_id=dag_id, run_id=current_dag_run_id)

    # Get the XCom values for the current DAG run
    # xcom_keys = ['key1', 'key2', 'key3']
    xcom_keys = 'key1'
    t_id='my_previous_task'
    # xcom_values = XCom.get_many(run_id= current_dag_run_id, key=None, task_ids=None, dag_ids=dag_id,session=session)
    
    xcom_values = XCom.get_one(key=xcom_keys, task_id=t_id, dag_id=dag_id, run_id= current_dag_run_id,session=session)
    # Use the XCom values
    value1 = xcom_values
    # value2 = xcom_values['key2']
    # value3 = xcom_values['key3']
    value2 = 'val 2'
    value3 = 'val 3'
    print(value1, value2, value3)
    # xcom = value1.with_entities(XCom.value).first()
    return xcom_values
    # if xcom:
    #    return XCom.deserialize_value(xcom)


with DAG(
    dag_id,
    start_date=datetime(2023, 1, 16),
    schedule_interval=None,
    max_active_runs=1,
    description='description',
    catchup=False,
    default_args=args,
    ) as dag:

    my_previous_task = PythonOperator(
        task_id='my_previous_task',
        python_callable=my_previous_task_func,
        provide_context=True,
        do_xcom_push=True
    )

    current_dag_run_id = "{{run_id}}"
    xcom_keys = 'key1'
    t_id='my_previous_task'
    
    value1=get_xcom_value()

    last_task = PythonOperator(
        task_id='last_task',
        python_callable=last_task_func,
        op_kwargs={"value1": value1,"value2":current_dag_run_id, "value3":dag_id, "value4":xcom_keys, "value5":t_id},
        provide_context=True,
        do_xcom_push=True
    )

    my_previous_task >> last_task
  • Do not try to access Xcoms outside of operator scope. Yes you can find a way to work around by directly querying the DB but this is might/will overwhelm the DB with queries as DAGs are parsed every 30 seconds. Don't go there. – Elad Kalif Mar 02 '23 at 17:08
  • Thanks @EladKalif, for the response, but i have the requirement to use the XCOM outside scope to generate dynamic tasks in a task group with help of the XCOM value from one task. – Ankit Gupta Mar 02 '23 at 17:47
  • This is not how it should be done. Check dynamic task mapping feature. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html – Elad Kalif Mar 02 '23 at 19:06
  • are you trying to get xcom value within `last_task_func`? The code looks like you are getting xcom value and passing to `last_task_func` but the `last_task_func` already have access to xcom natively within the function. `kwargs['ti'].xcom_pull(task_ids='my_previous_task', key='key1')` – Emma Mar 03 '23 at 17:37

0 Answers0