6

If I have multiple airflow dags with some overlapping python package dependencies, how can I keep each of these project deps. decoupled? Eg. if I had project A and B on same server I would run each of them with something like...

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

Basically, would like to run dags with the same situation (eg. each dag uses python scripts that have may have overlapping package deps. that I would like to develop separately (ie. not have to update all code using a package when want to update the package just for one project)). Note, I've been using the BashOperator to run python tasks like...

do_stuff = BashOperator(
        task_id='my_task',
        bash_command='python /path/to/script.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

Is there a way to get this working? IS there some other best-practice way that airflow intendeds for people to address (or avoid) these kinds of problems?

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102
  • There is very interesting product Oracle VM VirtualBox(One of possible decisions). You can have many independent virtual computers with it's own structure of software. It is free. – yu2 Oct 15 '19 at 23:57

3 Answers3

7

Based on discussion from the apache-airflow mailing list, the simplest answer that addresses the modular way in which I am using various python scripts for tasks is to directly call virtualenv python interpreter binaries for each script or module, eg.

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

would translate to something like

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
do_stuff_b = BashOperator(
        task_id='my_task_b',
        bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

in an airflow dag.


To the question of passing args to the Tasks, it depends on the nature of the args you want to pass in. In my case, there are certain args that depend on what a data table looks like on the day the dag is run (eg. highest timestamp record in the table, etc.). To add these args to the Tasks, I have a "congif dag" that runs before this one. In the config dag, there is a Task that generates the args for the "real" dag as a python dict and converts to a pickle file. Then the "config" dag has a Task that is a TriggerDagRunOperator that activates the "real" dag which has initial logic to read from the pickle file generated by the "config" dag (in my case, as a Dict) and I read it into that bash_command string like bash_command=f"python script.py {configs['arg1']}".

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102
4

you can use packaged dags, where each dag is packaged with its dependency http://airflow.apache.org/concepts.html#packaged-dags

vermaji
  • 539
  • 2
  • 10
1

There are operators for running Python. There is a relatively new one, the PythonVirtualenvOperator which will create an ephemeral virtualenv, install your dependencies, run your python, then tear down the environment. This does create some per-task overhead but is a functional (if not ideal) approach to your dependency overlap issue.

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator