I need to access XCOM values outside the operator, it can be done using variables, but i want to avoid it.
I am pushing the xcom values from one operator and then a funcction to fetch xcom values using getone/getmany function, but it is returning null/none.
even chatgpt is not providing correct result.
please have look at the below code and help fix it?
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime
from airflow.operators.bash import BashOperator
from airflow.models.xcom import XCom
from airflow.utils.session import provide_session
args = {
'owner': 'airflow',
'provide_context': True
}
from airflow.models import DagRun, Variable
from airflow.utils.dates import days_ago
def last_task_func(**kwargs):
value1 = kwargs["value1"]
value2 = kwargs["value2"]
value3 = kwargs["value3"]
value4 = kwargs["value4"]
value5 = kwargs["value5"]
final_value=f"{value1}-{value2}-{value3}-{value4}-{value5}"
print(f"value -- {final_value}")
return final_value
def my_previous_task_func(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(key="key1", value='xcom_value 1')
ti.xcom_push(key="key2", value='xcom_value 2')
ti.xcom_push(key="key3", value='xcom_value 1')
dag_id = "test_xcom_dag"
@provide_session
def get_xcom_value(session):
# Get the current DAG run ID from the Variable store
current_dag_run_id = "{{run_id}}"
# Variable.get("dag_run").get(dag_id)
# Get the current DAG run object
# current_dag_run = DagRun.find(dag_id=dag_id, run_id=current_dag_run_id)
# Get the XCom values for the current DAG run
# xcom_keys = ['key1', 'key2', 'key3']
xcom_keys = 'key1'
t_id='my_previous_task'
# xcom_values = XCom.get_many(run_id= current_dag_run_id, key=None, task_ids=None, dag_ids=dag_id,session=session)
xcom_values = XCom.get_one(key=xcom_keys, task_id=t_id, dag_id=dag_id, run_id= current_dag_run_id,session=session)
# Use the XCom values
value1 = xcom_values
# value2 = xcom_values['key2']
# value3 = xcom_values['key3']
value2 = 'val 2'
value3 = 'val 3'
print(value1, value2, value3)
# xcom = value1.with_entities(XCom.value).first()
return xcom_values
# if xcom:
# return XCom.deserialize_value(xcom)
with DAG(
dag_id,
start_date=datetime(2023, 1, 16),
schedule_interval=None,
max_active_runs=1,
description='description',
catchup=False,
default_args=args,
) as dag:
my_previous_task = PythonOperator(
task_id='my_previous_task',
python_callable=my_previous_task_func,
provide_context=True,
do_xcom_push=True
)
current_dag_run_id = "{{run_id}}"
xcom_keys = 'key1'
t_id='my_previous_task'
value1=get_xcom_value()
last_task = PythonOperator(
task_id='last_task',
python_callable=last_task_func,
op_kwargs={"value1": value1,"value2":current_dag_run_id, "value3":dag_id, "value4":xcom_keys, "value5":t_id},
provide_context=True,
do_xcom_push=True
)
my_previous_task >> last_task