0

I want to create a pipeline scheduler using airflow which will execute 5 python scripts stored in home directory. The python scripts are : test1.py, test2.py , test3.py test4.py and final.py. How should i load scripts in airflow , Can anybody help me out with code snippet . I am new to airflow, I tried tutorials but i am not able to understand using scheduler.

Please do not duplicate this question , I really need to understand .

1 Answers1

0

Given files test1.py, test2.py, test3.py as

# this is `test1.py`
def entry_point_1():
    print("entry_point_1")

you can create test_dag.py

.
├── __init__.py
├── test1.py
├── test2.py
├── test3.py
└── test_dag.py

There are 2 straightforward approaches

1. Using PythonOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import test1, test2, test3

dag_python: DAG = DAG(dag_id="dag_using_python_op",
                      start_date=datetime(year=2019, month=1, day=14),
                      schedule_interval=None)
python_op_1: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_1",
                                             python_callable=test1.entry_point_1)
python_op_2: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_2",
                                             python_callable=test2.entry_point_2)
python_op_3: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_3",
                                             python_callable=test3.entry_point_3)
python_op_1 >> python_op_2 >> python_op_3

2. Using BashOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

dag_bash: DAG = DAG(dag_id="dag_using_bash_op",
                    start_date=datetime(year=2019, month=1, day=14),
                    schedule_interval=None)
bash_op_1: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_1",
                                       bash_command="python -c 'import test1; test1.entry_point_1()")
bash_op_2: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_2",
                                       bash_command="python -c 'import test2; test2.entry_point_2()'")
bash_op_3: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_3",
                                       bash_command="python -c 'import test3; test3.entry_point_3()'")
bash_op_1 >> bash_op_2 >> bash_op_3

Note: you'll have to fix PYTHONPATH to make it work; I couldn't do it, but give it a try (and do report your finding in the comments)


y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • # this is `test1.py` def entry_point_1(): print("entry_point_1") i didnt understand this one..... i read the .py files using bashoperator but i am unable to pass arguments ...like " --output "data-folder" " – Anonymous_hacker Jan 16 '19 at 06:44
  • **@Anonymous_hacker** Recall that `BashOperator` merely runs a `bash_command`; so now the problem boils down to being able to **invoke some `function` in a `Python` *script* / *module* via (*bash*) *command-line* ** (not `Python` *shell*). And as you told, in your case, this also needs passing *arguments*. I'm no `Python` *expert*, but *Google is your friend* – y2k-shubham Jan 16 '19 at 07:00
  • **@Anonymous_hacker** also note that `BashOperator` allows you to pass [`env` (*environment variables*) for your `bash_command`](https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/bash_operator.py#L113) – y2k-shubham Jan 16 '19 at 07:08