I have docker with airflow to operate on DAGs. Want to import SnowflakeOperatorAsync from astronomer.providers.snowflake. Every time I reinstall python, change environment settings etc. my airflow gets an error
Broken DAG: [/opt/airflow/dags/astro_orders.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/astro_orders.py", line 12, in <module>
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
ModuleNotFoundError: No module named 'astronomer'
from datetime import datetime
from airflow.models import DAG
from pandas import DataFrame
from airflow.models.baseoperator import chain
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator, SQLTableCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime
from airflow.utils.task_group import TaskGroup
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
S3_FILE_PATH = "s3://kack-astrosdk/"
S3_CONN_ID = "aws_default"
SNOWFLAKE_CONN_ID = "snowflake_default"
SNOWFLAKE_ORDERS = "orders_table"
SNOWFLAKE_FILTERET_ORDERS = "filtered_table"
SNOWFLAKE_JOINED = "joined_table"
SNOWFLAKE_CUSTOMERS = "customers_table"
SNOWFLAKE_REPORTING = "reporting_table"
SNOWFLAKE_FORESTFIRE_TABLE = "forestfires"
with DAG(
dag_id='snowflake_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
create_table = SnowflakeOperatorAsync(
task_id='create_table',
snowflake_conn_id='snowflake_connection',
sql='CREATE TABLE test (Imie VARCHAR, Nazwisko VARCHAR, Numer INT)',
)
insert_data_1 = SnowflakeOperatorAsync(
task_id='insert_data_1',
snowflake_conn_id='snowflake_connection',
sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Marcin', 'Laczek', 1), ('Jadwiga', 'Herbut', 2)",
)
insert_data_2 = SnowflakeOperatorAsync(
task_id='insert_data_2',
snowflake_conn_id='snowflake_connection',
sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Stanislaw', 'Maczek', 3), ('Karolina', 'HohoĊ', 4)",
)
create_table >> [insert_data_1, insert_data_2]
Does anyone has an idea how to solve this?
I've tried clear python reinstallation, setting up my Airflow Docker from start.