4

I am using PostgresSQL Operator. The task is the following:

emailage_transformations = PostgresOperator(
    task_id = 'emailage_transformations',
    sql = '/home/ubuntu/airflow_ci/current/scripts/antifraud/emailage_transformations.sql',
    postgres_conn_id = 'redshift',
    autocommit = True,
    dag = dag)

At first, the content of the file was the next:

select cd_pedido_nr,fraud_score,risk_band,ip_risk_level
into antifraud.stg_emailage_id_pedido
from antifraud.stg_emailage_id_email e
left join antifraud.info_emails i on id_email = cd_email_nr
;

And the error I got was

jinja2.exceptions.TemplateNotFound: /home/ubuntu/airflow_ci/current/scripts/antifraud/emailage_transformations.sql

So I have added a couple of brackets to the query for complying with jinja2 templating and now the file code is:

{select cd_pedido_nr,fraud_score,risk_band,ip_risk_level
into antifraud.stg_emailage_id_pedido
from antifraud.stg_emailage_id_email e
left join antifraud.info_emails i on id_email = cd_email_nr
;}

However, I still have the same error. How could I solve it?

Javier Lopez Tomas
  • 2,072
  • 3
  • 19
  • 41

2 Answers2

7

I recon as told in following links, you ought to provide a template_searchpath to your DAG so as to enable it picking your external files (SQL or other files)

Alternatively making external file discoverable, such as by modifying AIRFLOW_HOME or through other tricks can also work

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • 1
    The default template searchpath is the folder where the dag itself is stored, so if the file is in the same folder than the DAG, you don't have to provide a template searchpath – Javier Lopez Tomas Jul 09 '20 at 05:37
0

For default Airflow operators, file paths must be relative (to the DAG folder or to the DAG's template_searchpath property).
But if you really need to use absolute paths, this can be achieved like this:

import pendulum

from airflow.decorators import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


class TestSqlOperator(SQLExecuteQueryOperator):
    def render_template_fields(self, context, jinja_env=None) -> None:
        if not jinja_env:
            jinja_env = self.get_template_env()
            if isinstance(jinja_env.loader.searchpath, list):
                jinja_env.loader.searchpath.append('/')
        super().render_template_fields(context=context, jinja_env=jinja_env)


@dag(
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1),
    catchup=False
)
def abs_path_sql_dag():
    TestSqlOperator(
        task_id="sql_op_abs_path",
        conn_id="<your_connection_id>",
        sql="/your/absolute/file/path.sql"
    )


abs_path_sql_dag()
Inetov
  • 11
  • 4