I am trying to use Airflow to execute a simple task python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
If i try, for example:
airflow test python_test print 2015-01-01
It works!
Now i want to put my def print_context(ds, **kwargs)
function in other python file. So i create antoher file called: simple_test.py and change:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Now I try to run again:
airflow test python_test print 2015-01-01
And OK! It still work!
But if i create a module, for example, worker module with file SimplePython.py
, import (from worker import SimplePython
)it and try:
airflow test python_test print 2015-01-01
It gives the message :
ImportError: No module named worker
The questions:
- Is it possible to import a module inside a DAG definition?
- How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?