19

I have a huge json file in the XCOM which later I do not need once the dag execution is finished, but I still see the Xcom Object in the UI with all the data, Is there any way to delete the XCOM programmatically once the DAG run is finished.

Thank you

vijay krishna
  • 263
  • 1
  • 3
  • 14

5 Answers5

16

You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

You can also purge old XCom data:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)
Gorka
  • 281
  • 2
  • 4
  • This helped me fix a broken admin xcom page (it couldn't unserialize some value), and I didn't want to fuss with my docker container. I just threw this into a python operator that was already running and this get me my admin xcom page back :) – CTS_AE Jan 08 '20 at 02:02
  • I also agree this should be the proper way to do it, otherwise migrating the database will be an issue – Alejandro Kaspar Oct 01 '20 at 15:13
12

You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

You can verify your query before you run the dag.

Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!

xcom metadata

Omar14
  • 2,007
  • 4
  • 21
  • 34
3

Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):

As dag_id is dynamic and dates should follow respective syntax of SQL.

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )
Chandan
  • 704
  • 7
  • 20
3

My solution to this problem is:

from airflow.utils.db import provide_session
from airflow.models import XCom

dag = DAG(...)

@provide_session
def cleanup_xcom(**context):     
    dag = context["dag"]
    dag_id = dag._dag_id 
    session=context["session"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

clean_xcom = PythonOperator(
    task_id="clean_xcom",
    python_callable = cleanup_xcom,
    provide_context=True, 
    dag=dag
)

clean_xcom

In Airflow 2.1.x, the code below likes not to work ...

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

so change to

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end

Ferris
  • 5,325
  • 1
  • 14
  • 23
Osmandi
  • 31
  • 1
  • SO discourages code-only responses. Pleaser highlight important portions of your solution with an explanation as to how/why it solves the OP's issue. It's also good to note caveats to using the code, if any. If helpful, links to source docs can also be included as reference. Most upvotes are gained over time as users learn something from your answer that they can apply to their own coding issues. – SherylHohman Nov 14 '20 at 15:36
  • this was the only solution that worked for me, I'm on Airflow 1.10.10 – cdabel Mar 17 '21 at 17:24
0

Using

from sqlalchemy import func 
[...]
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

to filter by date (as proposed above) did not work for me. Instead I had to provide a datetime (including timezone):

from airflow.models import XCom
from datetime import datetime, timedelta, timezone

[...]

@provide_session
def cleanup_xcom(session=None):
    ts_limit = datetime.now(timezone.utc) - timedelta(days=2)
    session.query(XCom).filter(XCom.execution_date <= ts_limit).delete()
    logging.info(f"deleted all XCOMs older than {ts_limit}")

xcom_cleaner = python_operator.PythonOperator(
    task_id='delete-old-xcoms',
    python_callable=cleanup_xcom)

xcom_cleaner 
laol
  • 414
  • 6
  • 9