27

I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

The log state the following:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.

Alessandro Mariani
  • 1,181
  • 2
  • 10
  • 26

4 Answers4

24

Here is an example use Variable to make it easy.

  • First add Variable in Airflow UI -> Admin -> Variable, eg. {key: 'sql_path', values: 'your_sql_script_folder'}

  • Then add following code in your DAG, to use Variable from Airflow you just add.

DAG code:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • Now you can use sql script name or path under folder Variable

  • You can learn more in this

Felipe Augusto
  • 7,733
  • 10
  • 39
  • 73
zhongjiajie
  • 2,098
  • 1
  • 12
  • 18
  • Please, could you provide a full example. Defining `template_searchpath`, what does that change the overall script behaviour, can I reference the file by its name now? For example would this complete your example: ``` with open(query_file_name, 'r') as file: query_content = file.read() ``` ? – ricoms Feb 12 '20 at 20:57
  • 3
    I don't think this would work with the example DAG the OP uses with `PythonOperator` and Python's native `open()`. The PythonOperator runs in a pod which doesn't have access to the same set of locations as the process which parses the DAGs. – LondonRob Feb 17 '20 at 16:25
  • 2
    @RicardoMS Hi, when you want to define your own `airflow.models.Variable`, the easiest way is by Airflow UI, `homepage -> Admin -> Variables` to create new variable, eg: `{'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}`. After you done, you could use the example code to load you variable by `tmpl_search_path = Variable.get("RicardoMS_variable")` instead of direct use `'/opt/specific/path'` – zhongjiajie Feb 18 '20 at 10:47
  • @LondonRob I'm having the issue that you're pointing out. The `$AIRFLOW_HOME` env var is set to `/opt/***` and even if I use its value `/opt/airflow` directly in a file path it is changed to `/opt/***` automagically and I get file not found errors. – user2268997 Apr 17 '23 at 21:38
  • @user2268997 accessing files on disk during Task execution is tricky because the actual Python code you're trying to execute doesn't run on the same machine as your Airflow installation: it runs in its own infrastructure. So if you need your tasks to have access to files on disk when they run, you'll have to plan for that explicitly. Maybe load the file during the parsing of the DAG Definition file then pass the result to the task as a string? – LondonRob Apr 18 '23 at 14:04
  • @lonndonrob it turned out that there was a typo and the code was good, but I had mounted the folder from the container on my local machine. Is it coincidental that it worked, perhaps because it was a local installation on a single machine, or is that expected behaviour? – user2268997 Apr 21 '23 at 16:41
9

Assuming that the sql directory is relative to the current Python file, you can figure out the absolute path to the sql file like this:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)
Jake
  • 1,518
  • 2
  • 14
  • 20
8

All relative paths are taken in reference to the AIRFLOW_HOME environment variable. Try:

  • Giving absolute path
  • place the file relative to AIRFLOW_HOME
  • try logging the PWD in the python callable and then decide what path to give (Best option)
Priyank Mehta
  • 2,453
  • 2
  • 21
  • 32
  • 2
    Good comment, but unfortunately AIRFLOW_HOME is an optional environment variable - Airflow works just fine without it - and you can't guarantee that it will be set. – Kirk Broadhurst Nov 15 '17 at 21:05
1

you can get DAG directory like below.

conf.get('core', 'DAGS_FOLDER')

# open file
open(os.path.join(conf.get('core', 'DAGS_FOLDER'), 'something.json'), 'r')

ref: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder

tsugitta
  • 75
  • 2
  • 7