1

I'm trying to run a DAG that calls a docker container and executes a command inside it, but Airflow cannot execute the task. Follows the error launched:

*** Reading local file: /opt/airflow/logs/docker_operator_dag/docker_command_hello/2021-05-26T02:40:13.171571+00:00/2.log
[2021-05-26 02:45:26,001] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [queued]>
[2021-05-26 02:45:26,030] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [queued]>
[2021-05-26 02:45:26,031] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 02:45:26,031] {taskinstance.py:1069} INFO - Starting attempt 2 of 2
[2021-05-26 02:45:26,032] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 02:45:26,060] {taskinstance.py:1089} INFO - Executing <Task(DockerOperator): docker_command_hello> on 2021-05-26T02:40:13.171571+00:00
[2021-05-26 02:45:26,080] {standard_task_runner.py:52} INFO - Started process 67 to run task
[2021-05-26 02:45:26,083] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'docker_operator_dag', 'docker_command_hello', '2021-05-26T02:40:13.171571+00:00', '--job-id', '11', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/docker_job/docker-job.py', '--cfg-path', '/tmp/tmp3vl5dv7x', '--error-file', '/tmp/tmptazwx_tc']
[2021-05-26 02:45:26,084] {standard_task_runner.py:77} INFO - Job 11: Subtask docker_command_hello
[2021-05-26 02:45:26,181] {logging_mixin.py:104} INFO - Running <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [running]> on host f1e5cfe4a07f
[2021-05-26 02:45:26,219] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=docker_operator_dag
AIRFLOW_CTX_TASK_ID=docker_command_hello
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T02:40:13.171571+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T02:40:13.171571+00:00
[2021-05-26 02:45:26,227] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.6/http/client.py", line 980, in send
    self.connect()
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.6/http/client.py", line 980, in send
    self.connect()
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 207, in _retrieve_server_version
    return self.version(api_version=False)["ApiVersion"]
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/daemon.py", line 181, in version
    return self._result(self._get(url), json=True)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 230, in _get
    return self.get(url, **self._set_request_timeout(kwargs))
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 555, in get
    return self.request('GET', url, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in execute
    self.cli = self._get_cli()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in _get_cli
    return APIClient(base_url=self.docker_url, version=self.api_version, tls=tls_config)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 190, in __init__
    self._version = self._retrieve_server_version()
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 215, in _retrieve_server_version
    'Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))
[2021-05-26 02:45:26,230] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=docker_operator_dag, task_id=docker_command_hello, execution_date=20210526T024013, start_date=20210526T024526, end_date=20210526T024526
[2021-05-26 02:45:26,261] {local_task_job.py:146} INFO - Task exited with return code 1

I'm using the Airflow docker-compose found here https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html and trying to run the following DAG:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
'owner'                 : 'airflow',
'description'           : 'Use of the DockerOperator',
'depend_on_past'        : False,
'start_date'            : datetime(2021, 5, 1),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}

with DAG('docker_operator_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
    start_dag = DummyOperator(
        task_id='start_dag'
        )

    end_dag = DummyOperator(
        task_id='end_dag'
        )        

    t1 = BashOperator(
        task_id='print_current_date',
        bash_command='date'
        )
        
    t2 = DockerOperator(
        task_id='docker_command_sleep',
        image='docker_image_task',
        container_name='task___command_sleep',
        api_version='auto',
        auto_remove=True,
        command="/bin/sleep 30",
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge"
        )

    t3 = DockerOperator(
        task_id='docker_command_hello',
        image='docker_image_task',
        container_name='task___command_hello',
        api_version='auto',
        auto_remove=True,
        command="/bin/sleep 40",
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge"
        )

    t4 = BashOperator(
        task_id='print_hello',
        bash_command='echo "hello world"'
        )

    start_dag >> t1 
    
    t1 >> t2 >> t4
    t1 >> t3 >> t4

    t4 >> end_dag

Also, I'm using Windows 10, I've tried adding the following in volumes: - "/var/run/docker.sock:/var/run/docker.sock" and I've succeeded in Ubuntu, but in Windows doesn't.

As requested, follow the docker-compose file:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME         - Docker image name used to run Airflow.
#                              Default: apache/airflow:master-python3.8
# AIRFLOW_UID                - User ID in Airflow containers
#                              Default: 50000
# AIRFLOW_GID                - Group ID in Airflow containers
#                              Default: 50000
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account.
#                              Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account.
#                              Default: airflow
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 5 # Just to have a fast load in the front-end. Do not use it in production with those configurations.
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true' # "_run_image of the DockerOperator returns now a python string, not a byte string" Ref: https://github.com/apache/airflow/issues/13487
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - "/var/run/docker.sock:/var/run/docker.sock" # We will pass the Docker Deamon as a volume to allow the webserver containers start docker images. Ref: https://stackoverflow.com/q/51342810/7024760
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:

BrenoShelby
  • 21
  • 1
  • 5
  • You should attach your docker-compose file to check that the ports are opened and mapped correctly – Gpack May 28 '21 at 01:29
  • @Gpack Thanks for the comment! The docker-compose file can be downloaded here [link](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html). – BrenoShelby May 28 '21 at 02:02
  • Did you find a solution to this? – EnE_ Jun 24 '21 at 03:30
  • 1
    I was running into this same issue earlier; [this answer](https://stackoverflow.com/questions/59791701/how-to-execute-docker-command-from-inside-docker-container-on-windows-host) helped – Kevin Grimm Jul 15 '21 at 19:03

0 Answers0