7

Problem :

New DAGs or changes to existing DAGs are not showing up on Airflow web server to use in the App.

For example. Suppose I add a new DAG in the DAG directory:

What does work:

  • If I run $ airflow dags list then the dag does show up.
  • Similarly if I look in the database using select dag_id from dag; then the new dag is present. So it is being picked up and put in the database.

What doesn't work:

  • The changes to the DAG do not show up in the web app
  • If I look at the DAG source code in the database in the table dag_code, the source code is not being updated
  • If I restart or stop/start the webserver and or scheduler then the DAGs still do not show up in the webserver or change in the dag_code table.

Finally What does work:

  • If I run $ airflow db init again then the changes are picked up and everything works fine...

So my system is stable and usable because I'm hacking the functionality of: $ airflow db init. Since running this command does not affect the data in the database I can actually work like this and just run the command every time there is a change. But I'm concerned since this isn't working as it's meant to, that it might be masking a deeper issue.

Any help would be greatly appreciated. I've listed my system specs and airflow set up below.

System Specifications and Airflow Setup:

System Specifications

  • OS: CentOS Linux 7 (Core)
  • Python Virtual Environment: Python 3.6.7, Airflow 2.1.3, pip 21.3.1
  • systemctl --version: systemd 219
  • psql (PostgreSQL) 9.2.23

Airflow set up :

airflow.cfg relevant parameters:

dags_folder = /home/svc-air-analytics/airflow/dags
base_log_folder = /home/svc-air-analytics/airflow/logs

...

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://svc-air-analytics:***@localhost:5432/svc_air_analytics

...

# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 10

...

Core locations:

I'm running on an AWS EC2 instance using user: svc-air-analytics. Key locations:

  • airflow.cfg location: /home/svc-air-analytics/airflow/airflow.cfg
  • dags location : /home/svc-air-analytics/airflow/dags/
  • Python virtual environment location: /home/env_svc_air_analytics

Systemctl setup (Running webserver and scheduler):

  • Environment file: /etc/sysconfig/airflow:
AIRFLOW_CONFIG=/home/svc-air-analytics/airflow/airflow.cfg
AIRFLOW_HOME=/home/svc-air-analytics/airflow
export PATH=$PATH:/home/svc-air-analytics/env_svc_air_analytics/bin/

  • scheduler: /usr/lib/systemd/system/airflow-scheduler.service:
[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service
Wants=postgresql.service

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=svc-air-analytics
Group=airflow
Type=simple
ExecStart=/home/svc-air-analytics/env_svc_air_analytics/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target
  • Webserver:
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service
Wants=postgresql.service

[Service]
Environment="PATH=/home/svc-air-analytics/env_svc_air_analytics/bin:/home/svc-air-analytics/airflow/"
User=svc-air-analytics
Group=airflow
Type=simple
ExecStart=/home/svc-air-analytics/env_svc_air_analytics/bin/python /home/svc-air-analytics/env_svc_air_analytics/bin/airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target
robertwest
  • 904
  • 7
  • 13

0 Answers0