I've been experiencing the same challenge since we would like to generate datasets dynamically and it looks like the current system doesn't support it out-of-the-box (see Dag code where it removes the dataset in case of not being declared as an outlet or inlet in the task/Dag dependencies).
However, it looks like it works for us using the following code:
@provide_session
@task.python(task_id="generate_dataset", on_failure_callback=lambda x: None)
def executable_func(session: Session = NEW_SESSION, **context: TaskInstance) -> list[Dataset]:
datasets = [Dataset(f"s3://potato-{random.randint(1, 4)}"), Dataset("s3://tomato")]
for dataset in datasets:
stored_dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
if not stored_dataset:
print(f"Dataset {dataset} not stored, proceeding to storing it")
dataset_model = DatasetModel.from_public(dataset)
session.add(dataset_model)
else:
print(f"Dataset {dataset} already stored, register dataset event instead")
dm = DatasetManager()
dm.register_dataset_change(task_instance=context["ti"], dataset=dataset, session=session)
session.flush()
session.commit()
return datasets
Then, we tested this approach having a dummy DAG, waiting for the following datasets:
with Dag(
dag_id="dataset_input_test",
description="Test Input Datasets",
schedule=[Dataset("s3://potato-2"), Dataset("s3://potato-3"), Dataset("s3://tomato")],
start_date=datetime(2023, 5, 2),
tags=["data", "test"],
) as dag:
EmptyOperator(task_id="empty")
We could see that potato-1
& potato-4
always disappear from the Datasets URL section since they are not explicitly referenced, but potato-2
& potato-3
keeps increasing.
Note:
Airflow has some current issues & open PR trying to address external datasets, so I hope this post helps you as a workaround while the community keeps working in this direction.
Note2:
We are using Airflow 2.6.1 and while I'm not proud of our solution, it looks like it works