16

i today tried to create my first airflow DAG:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'default_user',
    'start_date': days_ago(2),
    'depends_on_past': True,
    # With this set to true, the pipeline won't run if the previous day failed
    'email': ['demo@email.de'],
    'email_on_failure': True,
    # upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

dummy_task = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

My Airflow is running properly on Python3.6.8, but when I try to import the dagbag into airflow it throws this exception and I rly don't know why:

[2020-05-11 17:11:15,601] {scheduler_job.py:1576} WARNING - No viable dags retrieved from /root/airflow/dags/first_dag.py
[2020-05-11 17:11:15,616] {scheduler_job.py:162} INFO - Processing /root/airflow/dags/first_dag.py took 0.031 seconds
[2020-05-11 17:12:05,647] {scheduler_job.py:154} INFO - Started process (PID=26569) to work on /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,653] {scheduler_job.py:1562} INFO - Processing file /root/airflow/dags/first_dag.py for tasks to queue
[2020-05-11 17:12:05,654] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,654] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,666] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,662] {dagbag.py:239} ERROR - Failed to import: /root/airflow/dags/first_dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 236, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/lib64/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/airflow/dags/first_dag.py", line 34, in <module>
    dag=dag,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 70, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 422, in __init__
    self.dag = dag
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 548, in dag
    dag.add_task(self)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1301, in add_task
    raise AirflowException("Task is missing the start_date parameter")
airflow.exceptions.AirflowException: Task is missing the start_date parameter

I thought that I should give my Operators also an start_date, but they should also use the date from their DAG.

Luigi Drago
  • 163
  • 1
  • 1
  • 5

2 Answers2

43

That is because two of your tasks have not been assigned to the DAG which contains the start_date in default_args.

dummy_task = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

python_task = PythonOperator(task_id='python_task', python_callable=my_func, dag=dag)

Note you can use DAG object as a context manager as mentioned in https://airflow.apache.org/docs/stable/concepts.html#context-manager to avoid repeating dag=dag for all tasks:

Example:

with DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    bashtask = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    dummy_task = DummyOperator(task_id='dummy_task', retries=3)

    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    dummy_task.set_downstream(bashtask)
    python_task.set_downstream(bashtask)
kaxil
  • 17,706
  • 2
  • 59
  • 78
4

Had the same issue, You simply need to put dag=dag inside each operator that you use. because your operator still needs few more parameters to run as a task and those parameters are defined in DAG section before a TASK can run.

an example: -this is wrong:

postgres_task_1 = PostgresOperator(
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )

-this is right:

postgres_task_1 = PostgresOperator(
        dag=dag,
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )
Aramis NSR
  • 1,602
  • 16
  • 26