I am evaluating a framework like Airflow capable of building dynamic graph of tasks at runtime when running workflow, i.e. I don't know specifically the tasks and their dependencies before starting the workflow, I only know the number of levels of the graph.
I started playing with Airflow and I am using XCom to keep the state of the graph as explained here: Proper way to create dynamic workflows in Airflow
I am also extending this approach a bit by storing JSON snippets containing a description of task dependencies in the XCom rows, for example:
{
"key": "first_file",
"tasks" :
[
{
"task_id" : "third_task",
"dependencies" : ["first_task", "second_task"]
}
]
}
Note that I really do not need to rerun my DAG: my DAGs are meant to be externally scheduled and once my first DagRun has completed, no new tasks will be removed/deleted/modified after that. If a rerun is required I would create a new DAG instead.
The technique I am using is the following: I create DAGs with two tasks, one is a sensor (this guarantees that the DagRun is always in running state until the end)
class WaitTasksSensor(BaseSensorOperator):
...
def poke(self, context):
files = os.list_dir(MY_DIR)
for f in files:
filename = os.path.join(MY_DIR, f)
json_file = open(filename).read()
json_dict = json.loads(json_file)
key = json_dict["key"]
self.xcom_push(context, key + "_" + self.dag_id, json_file)
# This sensor completes successfully only when the "end" task appears in the graph
last_task_id = "end_" + self.dag_id
return last_task_id in self.dag.task_ids
def create_dags(dag_id):
with DAG(dag_id, schedule_interval=None):
first = DummyOperator(task_id="first_" + dag_id)
wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
first >> wait_sensor
pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")
While the Sensor pushes JSON files representing tasks and their dependencies (which keep coming in a folder), I try to pull tasks from XCom in the DAG code.
def pull_tasks(previous_task):
current_dag = previous_task.dag
dag_id = current_dag.dag_id
last_run = current_dag.get_last_dagrun(include_externally_triggered=True)
if not last_run:
return
last_run_date = last_run.execution_date
task_instances = previous_task.get_task_instances(start_date=last_run_date)
if not task_instances:
return
last_task_instance = task_instance[-1]
json_ids = [...]
for json_id in json_ids:
task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
key=json_id + "_" + dag_id,
dag_id=dag_id)
if task_graph:
task_graph_deserialized = json.loads(task_graph_json)
tasks = task_graph_deserialized["tasks"]
create_dynamic_tasks(dag, task_dicts)
def create_dynamic_tasks(dag, task_dicts):
dag_id = dag.dag_id
for task_dict in task_dicts:
task = DummyOperator(task_id=task_id + "_" + dag_id,
dag=dag)
dependencies = task_dict["dependencies"]
for predecessor_id in dependencies:
predecessor = dag.get_task(predecessor_id + "_" + dag_id)
predecessor >> task
My question is: is Airflow a valid instrument for such a use case? Or am I stretching it a bit too far from its main uses cases (i.e. fixed workflow with static tasks not generated at runtime)?
Would this approach scale for, let's say, tens of thousands DAGs and hundreds of thousands tasks? Or is there any other similar tool to achieve this in an easier way?