0

I am evaluating a framework like Airflow capable of building dynamic graph of tasks at runtime when running workflow, i.e. I don't know specifically the tasks and their dependencies before starting the workflow, I only know the number of levels of the graph.

I started playing with Airflow and I am using XCom to keep the state of the graph as explained here: Proper way to create dynamic workflows in Airflow

I am also extending this approach a bit by storing JSON snippets containing a description of task dependencies in the XCom rows, for example:

{
    "key": "first_file",
    "tasks" :
        [
            {
              "task_id" : "third_task",
              "dependencies" : ["first_task", "second_task"]
            }
        ]
}

Note that I really do not need to rerun my DAG: my DAGs are meant to be externally scheduled and once my first DagRun has completed, no new tasks will be removed/deleted/modified after that. If a rerun is required I would create a new DAG instead.

The technique I am using is the following: I create DAGs with two tasks, one is a sensor (this guarantees that the DagRun is always in running state until the end)

class WaitTasksSensor(BaseSensorOperator):
    ...
    def poke(self, context):
        files = os.list_dir(MY_DIR)

        for f in files:
           filename = os.path.join(MY_DIR, f)
           json_file = open(filename).read()
           json_dict = json.loads(json_file)
           key = json_dict["key"]
           self.xcom_push(context, key + "_" + self.dag_id, json_file)

        # This sensor completes successfully only when the "end" task appears in the graph
        last_task_id = "end_" + self.dag_id
        return last_task_id in self.dag.task_ids


def create_dags(dag_id):
    with DAG(dag_id, schedule_interval=None):
        first = DummyOperator(task_id="first_" + dag_id)

        wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
        first >> wait_sensor

        pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")

While the Sensor pushes JSON files representing tasks and their dependencies (which keep coming in a folder), I try to pull tasks from XCom in the DAG code.

def pull_tasks(previous_task):
   current_dag = previous_task.dag
   dag_id = current_dag.dag_id
   last_run = current_dag.get_last_dagrun(include_externally_triggered=True)
  
   if not last_run:
       return

   last_run_date = last_run.execution_date
   task_instances = previous_task.get_task_instances(start_date=last_run_date)

   if not task_instances:
       return

   last_task_instance = task_instance[-1]

   json_ids = [...]

   for json_id in json_ids:
       task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
                                                      key=json_id + "_" + dag_id,
                                                      dag_id=dag_id)
    
       if task_graph:
           task_graph_deserialized = json.loads(task_graph_json)
           tasks = task_graph_deserialized["tasks"]
           create_dynamic_tasks(dag, task_dicts)

def create_dynamic_tasks(dag, task_dicts):
   dag_id = dag.dag_id

   for task_dict in task_dicts:
       task = DummyOperator(task_id=task_id + "_" + dag_id,
                            dag=dag)
       dependencies = task_dict["dependencies"]
 
       for predecessor_id in dependencies:
           predecessor = dag.get_task(predecessor_id + "_" + dag_id)
           predecessor >> task

My question is: is Airflow a valid instrument for such a use case? Or am I stretching it a bit too far from its main uses cases (i.e. fixed workflow with static tasks not generated at runtime)?

Would this approach scale for, let's say, tens of thousands DAGs and hundreds of thousands tasks? Or is there any other similar tool to achieve this in an easier way?

Marco Bonifazi
  • 623
  • 2
  • 8
  • 10

1 Answers1

1

Your question seems similar to this question. In the answers there, I'm suggesting an ugly solution if you really must use Airflow.

However, the answer to your question is: I'd recommend to look into Argo Workflows. Since it's entirely running on Kubernetes, it's also very easily scalable.

bartcode
  • 589
  • 4
  • 14
  • Thanks! I tried Argo after your suggestion. I believe it is missing some important features for me, present in Airflow, namely the possibility of browsing through the "tasks" ("steps" in Argo). The Argo UI looks a bit minimal while Airflow seems to expose the right information at the granularity level I need (for ex. in Airflow I can clearly see what parameters there tasks are passing around). Regarding scalability, I am more interested in "how to keep the representation of many tasks/DAGs with their dependencies and logs" than "how to run many tasks at the same time". – Marco Bonifazi Nov 16 '20 at 18:45
  • 1
    I understand your experience with Argo, as I had the same initially. I think it's also a matter of getting used to, because in the end you _can_ view the details per step in Argo in a similar way. And the inputs coming from other steps are quite clearly programmed, too. When it comes to scalability: I think both aren't ideal for creating that amount of tasks. You could consider categorising those jobs and create an Airflow/Argo instance for each of those categories. – bartcode Nov 17 '20 at 19:15