1

I'm using Airflow for running tasks in docker swarm using DockerSwarmOperator. After DockerSwarmOperator task gets completed, the task remains running in Airflow while the corresponding container has been exited. Here is my docker-compose file:

version: '3.7'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://root:123456@mysql:3306/airflow?sql_mode=ALLOW_INVALID_DATES
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://root:123456@mysql:3306/airflow?sql_mode=ALLOW_INVALID_DATES
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: 'true'
    AIRFLOW__CELERY__BROKER_URL: amqp://root:1234@rabbitmq:5672//
    AIRFLOW__CELERY__RESULT_BACKEND: redis://redis:6379/0
    AIRFLOW__WEBSERVER__SECRET_KEY: '8985edffbc5fae34c5a93aa580fb935675c87e568a83c6bbca98b4f9d93f3b50'
    AIRFLOW__WEBSERVER__SECRET_KEY_CMD: 'uuidgen'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /var/run/docker.sock:/var/run/docker.sock
  user: "${AIRFLOW_UID:-50000}:0"

networks:
  my-network:
    external: true

services:

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "6969:8080"
    restart: always
    networks:
      - my-network
    deploy:
      replicas: 1
      placement:
        constraints: [ node.role == manager ]

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always
    networks:
      - my-network
    deploy:
      replicas: 1
      placement:
        constraints: [ node.role == manager ]


  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always
    networks:
      - my-network
    deploy:
      replicas: 1
      placement:
        constraints: [ node.role == manager ]

And here is the DAG:|

import airflow
from airflow import DAG
from airflow.contrib.operators.docker_swarm_operator import DockerSwarmOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 5, 29),
    'email': ['airflow@sample.com'],
    'email_on_failure': True,
    'email_on_retry': False
}
dag = DAG(
    'docker_swarm_sample',         
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

with dag as dag:
    t1 = DockerSwarmOperator(
        api_version='auto',                
        command='/bin/sleep 45',           
        image='192.168.12.50:5000/web',             
        auto_remove=True,                  
        task_id='sleep_with_swarm',        
    )

What should I do to make airflow task done?

I've checked permissions and airflow logs to find errors but there were any.

0 Answers0