So I'm creating a data flow with Apache Airflow for grabbing some data that's stored in a Pandas Dataframe and then storing it into MongoDB. So I have two python methods, one for fetching the data and returning the dataframe and the other for storing it into the relevant database. How do I take the output of one task and feed it as the input to another task? This is what I have so far (summarized and condensed version)
I looked into the concept of xcom pull and push and that's what I implemented below , I also saw that there's a MongoHook for Airflow but wasn't quite sure on how to use it.
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
Everytime I try to run it, it displays that context does not exist. So maybe I'm doing some wrong in terms of feeding the output of one task as the input to another?