1

I have structure as below in my native airflow build

dags/cust_dag.py dags/jhook.py --contains class UtilTriggers under which there are multiple methods

In cust_dag code i am calling that hook/module as :

from jhook import UtilTriggers as trigger

When I check on Airflow UI, I am getting broken dag for cust_dag mentioning error as ModuleNotFoundError: No module named jhook

The same kind of code is working on composer 1.9, currently I am running this on native airflow.

Also I have tried adding init.py file as well as created a new folder job_trigger under which I added that file still not working.

I have tried solution mentioned in this question Apache Airflow DAG cannot import local module

i.e adding below code lines in both hook custom module and dag file import sys sys.path.insert(0,os.path.abspath(os.path.dirname(file)))

Please guide me what can be the cause for this ModuleNotFound Error when everything looks okay.

Fusionist
  • 115
  • 2
  • 3
  • 8

1 Answers1

1

As Per your comments, the error message you are receiving is “import error”, it seems the problem is related to python only.

dag1.py

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

from new1 import hi as h1

default_dag_args = {
  # The start_date describes when a DAG is valid / can be run. Set this to a
  # fixed point in time rather than dynamically, since it is evaluated every
  # time a DAG is parsed. See:
  # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
  'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
      'demo_run',
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:

  # An instance of an operator is called a task. In this case, the
  # hello_python task calls the "greeting" Python function.
  hello_python = python_operator.PythonOperator(
      task_id='hello_world',
      python_callable=h1.funt,
      op_kwargs={"x" : "python"})

  # Likewise, the goodbye_bash task calls a Bash script.
  goodbye_bash = bash_operator.BashOperator(
      task_id='bye',
      bash_command='echo Goodbye.')


new1.py


class hi:
   @staticmethod
   def funt(x):
       return x + " is a programming language"


  1. Since You are using all your methods as Static Methods, there is no need to pass self to your method.The self keyword in the start method refers to the object. Because static methods can be called without object creation, they do not have a self keyword.
  2. If you are passing some arguments in your methods, make sure the arguments are also passed to the DAG Tasks as well by providing op_args and op_kwargs arguments.

To answer your question if this would be a kubernetes issue as it is hosted there. This issue is not related to kubernetes as

  • When we create a Composer Environment, The Composer service creates one GKE cluster per environment. The cluster is named and labeled automatically, and should not be deleted manually by users. The cluster is created and managed through the Deployment Manager.
  • If the cluster is deleted, then the environment will be irreparable and will need to be recreated. Kubernetes Errors will be like “Http error status code: 400 Http error message: BAD REQUEST”,etc.
Sandeep Mohanty
  • 1,419
  • 5
  • 13
  • Hi Sandeep, thank you for your answer, actually all my methods are static in class hence i did not create object also. code breaks while importing itself, we also tried restarting airflow still did not work out. Can it be an kubernetes issue as it is hosted there ? – Fusionist Jun 15 '21 at 16:04
  • Hi, thank you for your Response. As per the information you have provided, I have modified the solution to use the static methods. If this does not work, please provide your code. – Sandeep Mohanty Jun 16 '21 at 12:11
  • hey thanks, I am not sure what exactly was wrong, I was doing everything you mentioned already but suddenly it started working again. Thanks though. – Fusionist Aug 07 '21 at 13:52