1

I'd like to automatically set inlets and outlets parameters in executable function inside PythonOperator.

But, it seems to me that it doesn't work while it should. You can find the code snippet below:

from datahub_provider import entities

def executable_func(**kwargs):
    task = kwargs.get("task")
    task.inlets = [entities.Dataset(source, data_path) ...]
    task.outlets = [entitites.Dataset(source, data_path) ...]
    ...

dag_task = PythonOperator(
    task_id="task_id",
    python_callable=executable_func,
    provide_context=True,
    dag=dag,
)

I also tried the following way:

def executable_func(**kwargs):
    task = kwargs.get("ti")
    task.inlets = [entities.Dataset(source, data_path) ...]
    task.outlets = [entitites.Dataset(source, data_path) ...]
    ...

...

Is there a way to do what I want with the standard Airflow installation?

By the way, we use the Airflow 2.1.2

1 Answers1

0

I've been experiencing the same challenge since we would like to generate datasets dynamically and it looks like the current system doesn't support it out-of-the-box (see Dag code where it removes the dataset in case of not being declared as an outlet or inlet in the task/Dag dependencies).

However, it looks like it works for us using the following code:

@provide_session
@task.python(task_id="generate_dataset", on_failure_callback=lambda x: None)
def executable_func(session: Session = NEW_SESSION, **context: TaskInstance) -> list[Dataset]:
    datasets = [Dataset(f"s3://potato-{random.randint(1, 4)}"), Dataset("s3://tomato")]
    for dataset in datasets:
        stored_dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
        if not stored_dataset:
            print(f"Dataset {dataset} not stored, proceeding to storing it")
            dataset_model = DatasetModel.from_public(dataset)
            session.add(dataset_model)
        else:
            print(f"Dataset {dataset} already stored, register dataset event instead")
            dm = DatasetManager()
            dm.register_dataset_change(task_instance=context["ti"], dataset=dataset, session=session)

    session.flush()
    session.commit()
    return datasets

Then, we tested this approach having a dummy DAG, waiting for the following datasets:

with Dag(
    dag_id="dataset_input_test",
    description="Test Input Datasets",
    schedule=[Dataset("s3://potato-2"), Dataset("s3://potato-3"), Dataset("s3://tomato")],
    start_date=datetime(2023, 5, 2),
    tags=["data", "test"],
) as dag:
    EmptyOperator(task_id="empty")

We could see that potato-1 & potato-4 always disappear from the Datasets URL section since they are not explicitly referenced, but potato-2 & potato-3 keeps increasing.

Note: Airflow has some current issues & open PR trying to address external datasets, so I hope this post helps you as a workaround while the community keeps working in this direction.

Note2: We are using Airflow 2.6.1 and while I'm not proud of our solution, it looks like it works

Franzi
  • 1,791
  • 23
  • 21