I'm using Airflow for running tasks in docker swarm using DockerSwarmOperator. After DockerSwarmOperator task gets completed, the task remains running in Airflow while the corresponding container has been exited. Here is my docker-compose file:
version: '3.7'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://root:123456@mysql:3306/airflow?sql_mode=ALLOW_INVALID_DATES
AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://root:123456@mysql:3306/airflow?sql_mode=ALLOW_INVALID_DATES
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: 'true'
AIRFLOW__CELERY__BROKER_URL: amqp://root:1234@rabbitmq:5672//
AIRFLOW__CELERY__RESULT_BACKEND: redis://redis:6379/0
AIRFLOW__WEBSERVER__SECRET_KEY: '8985edffbc5fae34c5a93aa580fb935675c87e568a83c6bbca98b4f9d93f3b50'
AIRFLOW__WEBSERVER__SECRET_KEY_CMD: 'uuidgen'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- /var/run/docker.sock:/var/run/docker.sock
user: "${AIRFLOW_UID:-50000}:0"
networks:
my-network:
external: true
services:
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "6969:8080"
restart: always
networks:
- my-network
deploy:
replicas: 1
placement:
constraints: [ node.role == manager ]
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
networks:
- my-network
deploy:
replicas: 1
placement:
constraints: [ node.role == manager ]
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
networks:
- my-network
deploy:
replicas: 1
placement:
constraints: [ node.role == manager ]
And here is the DAG:|
import airflow
from airflow import DAG
from airflow.contrib.operators.docker_swarm_operator import DockerSwarmOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 5, 29),
'email': ['airflow@sample.com'],
'email_on_failure': True,
'email_on_retry': False
}
dag = DAG(
'docker_swarm_sample',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
with dag as dag:
t1 = DockerSwarmOperator(
api_version='auto',
command='/bin/sleep 45',
image='192.168.12.50:5000/web',
auto_remove=True,
task_id='sleep_with_swarm',
)
What should I do to make airflow task done?
I've checked permissions and airflow logs to find errors but there were any.