Problem :
New DAGs or changes to existing DAGs are not showing up on Airflow web server to use in the App.
For example. Suppose I add a new DAG in the DAG directory:
What does work:
- If I run
$ airflow dags list
then the dag does show up. - Similarly if I look in the database using
select dag_id from dag;
then the new dag is present. So it is being picked up and put in the database.
What doesn't work:
- The changes to the DAG do not show up in the web app
- If I look at the DAG source code in the database in the table
dag_code
, the source code is not being updated - If I restart or stop/start the webserver and or scheduler then the DAGs still do not show up in the webserver or change in the
dag_code
table.
Finally What does work:
- If I run
$ airflow db init
again then the changes are picked up and everything works fine...
So my system is stable and usable because I'm hacking the functionality of: $ airflow db init
. Since running this command does not affect the data in the database I can actually work like this and just run the command every time there is a change. But I'm concerned since this isn't working as it's meant to, that it might be masking a deeper issue.
Any help would be greatly appreciated. I've listed my system specs and airflow set up below.
System Specifications and Airflow Setup:
System Specifications
- OS: CentOS Linux 7 (Core)
- Python Virtual Environment: Python 3.6.7, Airflow 2.1.3, pip 21.3.1
- systemctl --version: systemd 219
- psql (PostgreSQL) 9.2.23
Airflow set up :
airflow.cfg
relevant parameters:
dags_folder = /home/svc-air-analytics/airflow/dags
base_log_folder = /home/svc-air-analytics/airflow/logs
...
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://svc-air-analytics:***@localhost:5432/svc_air_analytics
...
# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 10
...
Core locations:
I'm running on an AWS EC2 instance using user: svc-air-analytics
. Key locations:
airflow.cfg
location:/home/svc-air-analytics/airflow/airflow.cfg
dags
location :/home/svc-air-analytics/airflow/dags/
- Python virtual environment location:
/home/env_svc_air_analytics
Systemctl setup (Running webserver and scheduler):
- Environment file:
/etc/sysconfig/airflow
:
AIRFLOW_CONFIG=/home/svc-air-analytics/airflow/airflow.cfg
AIRFLOW_HOME=/home/svc-air-analytics/airflow
export PATH=$PATH:/home/svc-air-analytics/env_svc_air_analytics/bin/
- scheduler:
/usr/lib/systemd/system/airflow-scheduler.service
:
[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service
Wants=postgresql.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=svc-air-analytics
Group=airflow
Type=simple
ExecStart=/home/svc-air-analytics/env_svc_air_analytics/bin/airflow scheduler
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target
- Webserver:
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service
Wants=postgresql.service
[Service]
Environment="PATH=/home/svc-air-analytics/env_svc_air_analytics/bin:/home/svc-air-analytics/airflow/"
User=svc-air-analytics
Group=airflow
Type=simple
ExecStart=/home/svc-air-analytics/env_svc_air_analytics/bin/python /home/svc-air-analytics/env_svc_air_analytics/bin/airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target