1

Code:

from datetime import datetime, timedelta 

from airflow.operators.python import task
from airflow.operators.python_operator import PythonOperator
from airflow import DAG


@task
def get_content_body():
    b = 1
    print(b)


def get_content_body2(ti, **context):
    a = 1
    print(a)


default_args = {
    "owner": "Me",
    "depends_on_past": False,
    "start_date": datetime(2021, 1, 1),
    "email_on_failure": False,
    "email_on_retry": False,
    "pool": "default_pool",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "schedule_interval": "@daily",
}

with DAG(f"mydag", catchup=False, default_args=default_args) as dag:
    # task1
    _ = get_content_body()

    # task2
    t2 = PythonOperator(
        task_id="my_task2",
        python_callable=get_content_body2,
        provide_context=True,
    )
    t2

I'd like to connect get_content_body task and my_task2 in a single DAG but when I run it, the DAG is broken in Airflow 2.x:

enter image description here

How to connect them?

user3595632
  • 5,380
  • 10
  • 55
  • 111

1 Answers1

0

You can use the >> syntax to set tasks dependency in Airflow 2.

This should work for your case:

body = get_content_body()

t2 >> body

You can find a more detailed overview here

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49