I'm trying to run a DAG that calls a docker container and executes a command inside it, but Airflow cannot execute the task. Follows the error launched:
*** Reading local file: /opt/airflow/logs/docker_operator_dag/docker_command_hello/2021-05-26T02:40:13.171571+00:00/2.log
[2021-05-26 02:45:26,001] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [queued]>
[2021-05-26 02:45:26,030] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [queued]>
[2021-05-26 02:45:26,031] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-26 02:45:26,031] {taskinstance.py:1069} INFO - Starting attempt 2 of 2
[2021-05-26 02:45:26,032] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-26 02:45:26,060] {taskinstance.py:1089} INFO - Executing <Task(DockerOperator): docker_command_hello> on 2021-05-26T02:40:13.171571+00:00
[2021-05-26 02:45:26,080] {standard_task_runner.py:52} INFO - Started process 67 to run task
[2021-05-26 02:45:26,083] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'docker_operator_dag', 'docker_command_hello', '2021-05-26T02:40:13.171571+00:00', '--job-id', '11', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/docker_job/docker-job.py', '--cfg-path', '/tmp/tmp3vl5dv7x', '--error-file', '/tmp/tmptazwx_tc']
[2021-05-26 02:45:26,084] {standard_task_runner.py:77} INFO - Job 11: Subtask docker_command_hello
[2021-05-26 02:45:26,181] {logging_mixin.py:104} INFO - Running <TaskInstance: docker_operator_dag.docker_command_hello 2021-05-26T02:40:13.171571+00:00 [running]> on host f1e5cfe4a07f
[2021-05-26 02:45:26,219] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=docker_operator_dag
AIRFLOW_CTX_TASK_ID=docker_command_hello
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T02:40:13.171571+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T02:40:13.171571+00:00
[2021-05-26 02:45:26,227] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
self.send(msg)
File "/usr/local/lib/python3.6/http/client.py", line 980, in send
self.connect()
File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
sock.connect(self.unix_socket)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 734, in reraise
raise value.with_traceback(tb)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
self.send(msg)
File "/usr/local/lib/python3.6/http/client.py", line 980, in send
self.connect()
File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 207, in _retrieve_server_version
return self.version(api_version=False)["ApiVersion"]
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/daemon.py", line 181, in version
return self._result(self._get(url), json=True)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
return f(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 230, in _get
return self.get(url, **self._set_request_timeout(kwargs))
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 555, in get
return self.request('GET', url, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in execute
self.cli = self._get_cli()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in _get_cli
return APIClient(base_url=self.docker_url, version=self.api_version, tls=tls_config)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 190, in __init__
self._version = self._retrieve_server_version()
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 215, in _retrieve_server_version
'Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))
[2021-05-26 02:45:26,230] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=docker_operator_dag, task_id=docker_command_hello, execution_date=20210526T024013, start_date=20210526T024526, end_date=20210526T024526
[2021-05-26 02:45:26,261] {local_task_job.py:146} INFO - Task exited with return code 1
I'm using the Airflow docker-compose found here https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html and trying to run the following DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner' : 'airflow',
'description' : 'Use of the DockerOperator',
'depend_on_past' : False,
'start_date' : datetime(2021, 5, 1),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
}
with DAG('docker_operator_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
start_dag = DummyOperator(
task_id='start_dag'
)
end_dag = DummyOperator(
task_id='end_dag'
)
t1 = BashOperator(
task_id='print_current_date',
bash_command='date'
)
t2 = DockerOperator(
task_id='docker_command_sleep',
image='docker_image_task',
container_name='task___command_sleep',
api_version='auto',
auto_remove=True,
command="/bin/sleep 30",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
t3 = DockerOperator(
task_id='docker_command_hello',
image='docker_image_task',
container_name='task___command_hello',
api_version='auto',
auto_remove=True,
command="/bin/sleep 40",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
t4 = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"'
)
start_dag >> t1
t1 >> t2 >> t4
t1 >> t3 >> t4
t4 >> end_dag
Also, I'm using Windows 10, I've tried adding the following in volumes: - "/var/run/docker.sock:/var/run/docker.sock"
and I've succeeded in Ubuntu, but in Windows doesn't.
As requested, follow the docker-compose file:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:master-python3.8
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_GID - Group ID in Airflow containers
# Default: 50000
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account.
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account.
# Default: airflow
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 5 # Just to have a fast load in the front-end. Do not use it in production with those configurations.
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true' # "_run_image of the DockerOperator returns now a python string, not a byte string" Ref: https://github.com/apache/airflow/issues/13487
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- "/var/run/docker.sock:/var/run/docker.sock" # We will pass the Docker Deamon as a volume to allow the webserver containers start docker images. Ref: https://stackoverflow.com/q/51342810/7024760
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
volumes:
postgres-db-volume: