I have connection to local Postgres DB. I'm trying take some rows from table in 'task_get_titanic_data' with get_titanic_data
function and apply some changes by second task task_process_titanic_data
and process_titanic_data
.
import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_titanic_data():
sql_stmt = 'select * from titanic limit 100'
pg_hook = PostgresHook(
postgres_conn_id='postgres_db',
schema='airflow_database'
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
result = cursor.fetchall()
return result
def process_titanic_data(ti):
# print(ti.xcom_pull(task_ids=['get_titanic_data']))
titanic = ti.xcom_pull(task_ids=['get_titanic_data'])
if not titanic:
raise Exception('No data')
titanic = pd.DataFrame(data=titanic, columns=[
'PassengerId', 'Survived',
'Pclass', 'Name',
'Sex', 'Age',
'SibSp', 'Parch',
'Fare', 'Embarked'
])
titanic = titanic[(titanic.Age <= 16) & (titanic.Pclass == 1) & (titanic.Survived == 1)]
titanic.to_csv(Variable.get('tmp_titanic_csv_location'), index=False)
with DAG(
dag_id='postgres_db_dag',
schedule_interval='@daily',
start_date=datetime(year=2023, month=2, day=21),
catchup=False
) as dag:
task_get_titanic_data = PythonOperator(
task_id='get_titanic_data',
python_callable=get_titanic_data,
# do_xcom_push=True
)
task_process_titanic_data = PythonOperator(
task_id='process_titanic_data',
python_callable=process_titanic_data
)
When I take data by first task - All is ok.
Problem is: when I start second task raise my Exception from process_titanic_data
function:
- Why?
- How can I fix this?
I have no idea about causes. Seems like all Variables created and all paths is ok.
Tried go inside Task_instance and understand cause... Checked all Variables and Paths... Google it as much as I can... Saw and tried this. Nothing!