So I have this quite nice DAG in airflow which basically runs several analysis steps (implemented as airflow plugins) on binary files. A DAG is triggert by an ftp sensor which just checks if there is a new file on the ftp server and then starts the whole workflow.
So currently the workflow is like this: DAG is triggert as defined -> sensor waits for new file on ftp -> analysis steps are executed -> end of workflow.
What I'd like to have is something like this: DAG is triggerts -> sensor waits for new file on ftp -> for every file on the ftp the analysis steps are executed individully -> each workflow ends individually.
How do I get the analysis workflow to be executed for each file on the ftp server and if there is no file on the server, just one sensor should wait for a new file? I don't want to e.g., start a DAG every second or so because then I have many sensors just waiting for a new file.