1

I am trying to spin up a testing Pod with the KubernetesPodOperator. As an image I am using the hello-world example from Docker, which I pushed to the local registry of my MicroK8s installation.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
from airflow.utils.dates import days_ago
from datetime import timedelta

ports = [Port('http', 80)]

default_args = {
    'owner': 'user',
    'start_date': days_ago(5),
    'email': ['user@mail'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

workflow = DAG(
    'kubernetes_helloworld',
    default_args=default_args,
    description='Our first DAG',
    schedule_interval=None,
)

op = DummyOperator(task_id='dummy', dag=workflow)

t1 = KubernetesPodOperator(
    dag=workflow,
    namespace='default',
    image='localhost:32000/hello-world:registry',
    name='pod2',
    task_id='pod2',
    is_delete_operator_pod=True,
    hostnetwork=False,
    get_logs=True,
    do_xcom_push=False,
    in_cluster=False,
    ports=ports,
    )

op >> t1

When I trigger the DAG it keeps running and reattempts to launch the pod indefinite times. This is the log output I get in Airflow:

Reading local file: /home/user/airflow/logs/kubernetes_helloworld/pod2/2021-03-17T16:25:11.142695+00:00/4.log
[2021-03-17 16:30:00,315] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:1042} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,320] {taskinstance.py:1043} INFO - Starting attempt 4 of 1
[2021-03-17 16:30:00,320] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,330] {taskinstance.py:1063} INFO - Executing <Task(KubernetesPodOperator): pod2> on 2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:30:00,332] {standard_task_runner.py:52} INFO - Started process 9021 to run task
[2021-03-17 16:30:00,335] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'kubernetes_helloworld', 'pod2', '2021-03-17T16:25:11.142695+00:00', '--job-id', '57', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/kubernetes_helloworld.py', '--cfg-path', '/tmp/tmp5ss4g6q4', '--error-file', '/tmp/tmp9t3l8emt']
[2021-03-17 16:30:00,336] {standard_task_runner.py:77} INFO - Job 57: Subtask pod2
[2021-03-17 16:30:00,357] {logging_mixin.py:104} INFO - Running <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [running]> on host 05nclorenzvm01.internal.cloudapp.net
[2021-03-17 16:30:00,369] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=user
AIRFLOW_CTX_DAG_OWNER=user
AIRFLOW_CTX_DAG_ID=kubernetes_helloworld
AIRFLOW_CTX_TASK_ID=pod2
AIRFLOW_CTX_EXECUTION_DATE=2021-03-17T16:25:11.142695+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:32:09,805] {connectionpool.py:751} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f812fc23eb0>: Failed to establish a new connection: [Errno 110] Connection timed out')': /api/v1/namespaces/default/pods?labelSelector=dag_id%3Dkubernetes_helloworld%2Cexecution_date%3D2021-03-17T162511.1426950000-e549b02ea%2Ctask_id%3Dpod2

When I launch the pod in kubernetes itself without Airflow it runs fine. What am I doing wrong?

I tried the following things:

  • Prevent the container from exiting with sleep commands
  • Try different images e.g pyspark
  • Reinstall Airflow and MicroK8s

Airflow v2.0.1 MicroK8s v1.3.7 Python 3.8 Ubuntu 18.04 LTS

Lorenz
  • 123
  • 1
  • 9
  • Did you activate microk8s dns addons ? – Adil Blanco Mar 19 '21 at 20:40
  • Which airflow version? Do you use astronomer + airflow ? – Adil Blanco Mar 20 '21 at 00:40
  • I don't have the dns addon enabled...is that neccesary? Also I am not using Astronomer. All versions are at the end of my post. – Lorenz Mar 25 '21 at 14:07
  • dns deploys CoreDNS to supply address resolution services to Kubernetes. This service is commonly required by other addons, so it is recommended that you enable it. https://microk8s.io/docs/addon-dns – Adil Blanco Mar 30 '21 at 02:02
  • @Lorenz Could you share the docker-compose file you used to spin up the local Airflow with KubernetesExecutor please ? – val Sep 10 '21 at 08:19

2 Answers2

1

Unfortunately I still haven't figured out the problem with microK8s.

But I was able to use the KubernetesPodOperator in Airflow with minikube. The following code was able to run without any problems:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'user',
    'start_date': days_ago(5),
    'email': ['user@airflow.de'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

namespace = conf.get('kubernetes', 'NAMESPACE')

if namespace =='default':
    config_file = '/home/user/.kube/config'
    in_cluster=False
else:
    in_cluster=True
    config_file=None

dag = DAG('example_kubernetes_pod',
          schedule_interval='@once',
          default_args=default_args)

with dag:
    k = KubernetesPodOperator(
        namespace=namespace,
        image="hello-world",
        labels={"foo": "bar"},
        name="airflow-test-pod",
        task_id="task-one",
        in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
        cluster_context='minikube', # is ignored when in_cluster is set to True
        config_file=config_file,
        is_delete_operator_pod=True,
        get_logs=True)
Lorenz
  • 123
  • 1
  • 9
0

To answer your question I suppose you are running the task on a local microk8s cluster without VM.

The airflow may not be able to connect to the K8s control plane to trigger the pod. Add cluster_context="microk8s".

t1 = KubernetesPodOperator(
         dag=workflow,
         namespace='default',
         image='localhost:32000/hello-world:registry',
         name='pod2',
         task_id='pod2',
         is_delete_operator_pod=True,
         get_logs=True,
         do_xcom_push=False,
         in_cluster=False,
         cluster_context='microk8s',
         config_file='/path/to/config',
         ports=ports,
     )

To see the cluster context used, type the following command and redirect the output to the config file (in Airflow project):

microk8s.kubectl config view --flatten > config

Output:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0.............
    server: https://127.0.0.1:16443
  name: microk8s-cluster
contexts:
- context:
    cluster: microk8s-cluster
    user: admin
  name: microk8s
current-context: microk8s
kind: Config
preferences: {}
users:
- name: admin
  user:
     token: SldHNFQ3ek9yUGh4TVhWN......................................
Adil Blanco
  • 616
  • 2
  • 6
  • 23
  • Thank you for your answer Adil. I just tried it with the cluster_context attribute added, but unfortunately it doesn't make a difference. I still get the "Failed to establish a new connection: [Errno 110] Connection timed out" exception. So clearly Airflow is not able to contact microK8s. Do you have another idea to solve this? – Lorenz Mar 30 '21 at 19:11
  • I'm getting the same error as in this post: https://stackoverflow.com/questions/55742540/kubernetes-python-client-connection-issue Unfortunately I don't know how to apply the fix in the context of Airflow. Do you have an idea? – Lorenz Mar 30 '21 at 19:59
  • It's not easy for me to know your problem as long as I have no idea about your airflow installation. I strongly advise you to install airflow via astronomer https://www.astronomer.io/docs/cloud/stable/develop/cli-quickstart – Adil Blanco Mar 30 '21 at 21:49
  • I tried the exact same example as in this post by astonomer: https://www.astronomer.io/docs/cloud/stable/develop/kubepodoperator-local Which also references the config_file as in your answer, but it doesn't make a difference. I might try to reinstall Airflow with Astronomer or inside Docker. – Lorenz Mar 31 '21 at 11:48
  • I don't understand your question :( in short with astronomer you don't need to install airflow, just create a directory for your project and run the `astro dev init` command to generate the project structure then create the config file in `include/kube/config`, finally you execute the command `astro dev start` and astro takes care of the rest. – Adil Blanco Mar 31 '21 at 18:50