4

I want to store data from SQL to Pandas dataframe and do some data transformations and then load to another table suing airflow

Issue that I am facing is that connection string to tables are accessbale only through Airflow. So I need to use airflow as medium to read and write data.

How can this be done ?

MY code

Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag

The output of task needs to be stored to dataframe (df) and after tranfromations and load back into another table.

How can this be done?

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Rahul rajan
  • 1,186
  • 4
  • 18
  • 32

2 Answers2

3

I doubt there's an in-built operator for this. You can easily write a custom operator

  • Extend PostgresOperator or just BaseOperator / any other operator of your choice. All custom code goes into the overridden execute() method
  • Then use PostgresHook to obtain a Pandas DataFrame by invoking get_pandas_df() function
  • Perform whatever transformations you have to do in your pandas df
  • Finally use insert_rows() function to insert data into table

UPDATE-1

As requested, I'm hereby adding the code for operator

from typing import Dict, Any, List, Tuple

from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame


class MyCustomOperator(PostgresOperator):

    @apply_defaults
    def __init__(self, destination_table: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.destination_table: str = destination_table

    def execute(self, context: Dict[str, Any]):
        # create PostgresHook
        self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                               schema=self.database)
        # read data from Postgres-SQL query into pandas DataFrame
        df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        # perform transformations on df here
        df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
        ..
        # convert pandas DataFrame into list of tuples
        rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
        # insert list of tuples in destination Postgres table
        self.hook.insert_rows(table=self.destination_table, rows=rows)

Note: The snippet is for reference only; it has NOT been tested

References

Further modifications / improvements

  • The destination_table param can be read from Variable
  • If the destination table doesn't necessarily reside in same Postgres schema, then we can take another param like destination_postgres_conn_id in __init__ and use that to create a destination_hook on which we can invoke insert_rows method
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
3

Here is a very simple and basic example to read data from a database into a dataframe.

    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")

Kudos to y2k-shubham for the get_pandas_df() tip.

I also save the dataframe to file to pass it to the next task (this is not recommended when using clusters since the next task could be executed on a different server)

This full code should work as it is.

from airflow import DAG
from airflow.operators.python import PythonOperator,
from airflow.utils.dates import days_ago
from airflow.hooks.mysql_hook import MySqlHook

dag_id = "db_test"
args = {
    "owner": "airflow",
}

base_file_path = "dags/files/"

def export_func(task_instance):
    import time

    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")

    # Generate somewhat unique filename
    path = "{}{}_{}.ftr".format(base_file_path, dag_id, int(time.time()))
    # Save as a binary feather file
    df.to_feather(path)
    print("Export done")

    # Push the path to xcom
    task_instance.xcom_push(key="path", value=path)


def import_func(task_instance):
    import pandas as pd

    # Get the path from xcom
    path = task_instance.xcom_pull(key="path")
    # Read the binary file
    df = pd.read_feather(path)

    print("Import done")
    # Do what you want with the dataframe
    print(df)

with DAG(
    dag_id,
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["test"],
) as dag:

    export_task = PythonOperator(
        task_id="export_df",
        python_callable=export_func,
    )

    import_task = PythonOperator(
        task_id="import_df",
        python_callable=import_func,
    )

    export_task >> import_task
Thomas J
  • 146
  • 10