Here is a very simple and basic example to read data from a database into a dataframe.
# Get the hook
mysqlserver = MySqlHook("Employees")
# Execute the query
df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")
Kudos to y2k-shubham for the get_pandas_df() tip.
I also save the dataframe to file to pass it to the next task (this is not recommended when using clusters since the next task could be executed on a different server)
This full code should work as it is.
from airflow import DAG
from airflow.operators.python import PythonOperator,
from airflow.utils.dates import days_ago
from airflow.hooks.mysql_hook import MySqlHook
dag_id = "db_test"
args = {
"owner": "airflow",
}
base_file_path = "dags/files/"
def export_func(task_instance):
import time
# Get the hook
mysqlserver = MySqlHook("Employees")
# Execute the query
df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")
# Generate somewhat unique filename
path = "{}{}_{}.ftr".format(base_file_path, dag_id, int(time.time()))
# Save as a binary feather file
df.to_feather(path)
print("Export done")
# Push the path to xcom
task_instance.xcom_push(key="path", value=path)
def import_func(task_instance):
import pandas as pd
# Get the path from xcom
path = task_instance.xcom_pull(key="path")
# Read the binary file
df = pd.read_feather(path)
print("Import done")
# Do what you want with the dataframe
print(df)
with DAG(
dag_id,
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=["test"],
) as dag:
export_task = PythonOperator(
task_id="export_df",
python_callable=export_func,
)
import_task = PythonOperator(
task_id="import_df",
python_callable=import_func,
)
export_task >> import_task