2

I am using Airflow 1.9.0 with a custom SFTPOperator. I have code in my DAGs that poll an SFTP site to find new files. If any are found, then I create custom task id's for the dynamically created task and retrieve/delete the files.

directory_list = sftp_handler('sftp-site', None, '/', None, SFTPToS3Operation.LIST) for file_path in directory_list: ... SFTP code that GET's the remote files

That part works fine. It seems both the airflow webserver and airflow scheduler are iterating through all the DAGs once a second and actually running the code that retrieves the directory_list. This means I'm hitting the SFTP site ~2 x a second to authenticate and pull a list of files. I'd like to have some conditional code that only executes if the DAG is actually being run.

When an SFTP site uses password authentication, the # of times I connect really isn't an issue. One site requires key authentication and if there are too many authentication failures in a short timespan, the account is locked. During my testing, this seems to happen occasionally for reasons I'm still trying to track down.

However, if I were authenticating only when the DAG was scheduled to execute, or executing manually, this would not be an issue. It also seems wasteful to spend so much time connecting to an SFTP site when it's not scheduled to do so.

I've seen a post that can check to see if a task is executing, but that's not ideal as I'd have to create a long-running task, using up resources I shouldn't require, just to perform that test. Any thoughts on how to accomplish this?

Chris DeBracy
  • 33
  • 1
  • 6

1 Answers1

5

You have a very good use case for Airflow (SFTP to _____ batch jobs), but Airflow is not meant for dynamic DAGs as you are attempting to use them.

Top-Level DAG Code and the Scheduler Loop

As you noticed, any top-level code in a DAG is executed with each scheduler loop. Or put another way, every time the scheduler loop processes the files in your DAG directory it is interpreting all the code in your DAG files. Anything not in a task or operator is interpreted/executed immediately. This puts undue strain on the scheduler as well as any external systems you are making calls to.

Dynamic DAGs and the Airflow UI

Airflow does not handle dynamic DAGs through the UI well. This is mostly the result of the Airflow DAG state not being stored in the database. DAG views and history are rendered based on what exist in the interpreted DAG file at any given moment. I personally hope to see this change in the future with some form of DAG versioning.

In a dynamic DAG you can both add and remove tasks from a DAG.

Adding Tasks Dynamically

When adding tasks for a DAG run will make it appear (in the UI) that all DAG runs before when that task never ran that task all. The will have a None state and the DAG run will be set to success or failed depending on the outcome of the DAG run.

Removing Tasks Dynamically

If your dynamic DAG ever removes tasks you will lose the ability to review history of the DAG. For example, if you run a DAG with task_x in the first 20 DAG runs but remove it after that, it will fail to show up in the UI until it is added back into the DAG.

Idempotency and Airflow Airflow works best when the DAG runs are idempotent. This means that re-running any DAG Run should have the same affect no matter when you run it or how many times you run it. Dynamic DAGs in Airflow break idempotency by adding and removing tasks to previous DAG runs so that the results of re-running are not the same.

Solution Options

You have at least two options moving forward

1.) Continue to build your SFTP DAG dynamically, but create another DAG that writes the available SFTP files to a local file (if not using distributed executor) or an Airflow Variable (this will result in more reads to the Airflow DB) and build your DAG dynamically from that.

2.) Overload the SFTPOperator to take a list of files so that every file that exist is processed within a single task run. This will make the DAGs idempotent and you will maintain accurate history through the logs.

I apologize for the extended explanation, but you're touching on one of the rough spots of Airflow and I felt it was appropriate to give an overview of the problem at hand.

andscoop
  • 939
  • 5
  • 8
  • I appreciate the detailed response. I tend to find those interesting use cases. The Airflow Variable may make the most sense as I can use that to track the last time I actually created the connection and use it to limit how often I re-connect. – Chris DeBracy Aug 06 '18 at 14:49