1

I have docker with airflow to operate on DAGs. Want to import SnowflakeOperatorAsync from astronomer.providers.snowflake. Every time I reinstall python, change environment settings etc. my airflow gets an error

Broken DAG: [/opt/airflow/dags/astro_orders.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/astro_orders.py", line 12, in <module>
    from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
ModuleNotFoundError: No module named 'astronomer'
from datetime import datetime

from airflow.models import DAG
from pandas import DataFrame
from airflow.models.baseoperator import chain
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator, SQLTableCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime
from airflow.utils.task_group import TaskGroup
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync

S3_FILE_PATH = "s3://kack-astrosdk/"
S3_CONN_ID = "aws_default"
SNOWFLAKE_CONN_ID = "snowflake_default"
SNOWFLAKE_ORDERS = "orders_table"
SNOWFLAKE_FILTERET_ORDERS = "filtered_table"
SNOWFLAKE_JOINED = "joined_table"
SNOWFLAKE_CUSTOMERS = "customers_table"
SNOWFLAKE_REPORTING = "reporting_table"
SNOWFLAKE_FORESTFIRE_TABLE = "forestfires"


with DAG(
    dag_id='snowflake_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:
    create_table = SnowflakeOperatorAsync(
        task_id='create_table',
        snowflake_conn_id='snowflake_connection',
        sql='CREATE TABLE test (Imie VARCHAR, Nazwisko VARCHAR, Numer INT)',
    )
    insert_data_1 = SnowflakeOperatorAsync(
        task_id='insert_data_1',
        snowflake_conn_id='snowflake_connection',
        sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Marcin', 'Laczek', 1), ('Jadwiga', 'Herbut', 2)",
    )
    insert_data_2 = SnowflakeOperatorAsync(
        task_id='insert_data_2',
        snowflake_conn_id='snowflake_connection',
        sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Stanislaw', 'Maczek', 3), ('Karolina', 'HohoĊ‚', 4)",
    ) 

    create_table >> [insert_data_1, insert_data_2]

Does anyone has an idea how to solve this?

I've tried clear python reinstallation, setting up my Airflow Docker from start.

dimnnv
  • 678
  • 3
  • 8
  • 21
Nielm
  • 11
  • 1

1 Answers1

1

Airflow docker image does not come with astronomer-provider. you will have to install it in your image, or you can use astronomer runtime image

To install it in your image add the below command in your dockerfile

RUN pip install astronomer-providers

Get astronomer-runtime image from https://quay.io/repository/astronomer/astro-runtime this comes with astronomer-providers

Example: https://github.com/astronomer/astronomer-providers/blob/main/.circleci/integration-tests/Dockerfile.astro_cloud#L2

Pankaj Singh
  • 863
  • 2
  • 9
  • 18