I'm trying to figure out how to process files from S3. Below some my ideas and questions. Or maybe you could share your experience. Main difficulties linked to passing a file downloaded from S3 and pandas dataframe from one task to another - they are serializible and I cannot user xcom. And the only way I could find - save them on host filesystem and pass file paths from one task to another.
import pandas as pd
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.utils.dates import days_ago
@task
def download_from_s3(filename: str, bucket_name: str):
# as far as I see there is no way to pass bytes from one task to another
# and the only way is to store file in file system and return path to it?
# file size is about 100 Mb
return '???'
@task
def read_file_into_dataframe(file_path: str):
# dataframe can be huge - about 1 Gb and more
df = pd.read_excel(file_path)
# save dataframe into file, f.e. into HDF5, Parquet and return file path to it?
return '???'
@task
def prepare_dataframe_for_working(dataframe_path: str):
# set index, remove some blank rows and columns
df = pd.read_hdf(dataframe_path)
# do some processing
return dataframe_path
def _validate(df: pd.DataFrame):
return
@task
def validate(dataframe_path: str):
# do some validations
# if there is no validation errors found save data from dataframe into Postgres
df = pd.read_hdf(dataframe_path)
validation_errors = _validate(df)
if validation_errors:
# save validation errors
return True
return False
@task
def save(dataframe_path: str):
df = pd.read_hdf(dataframe_path)
# covert for saving into postgres
@task_group
def process_file(filename: str, bucket_name: str):
file_path = download_from_s3(filename, bucket_name)
dataframe_path = read_file_into_dataframe(file_path)
dataframe_path >> prepare_dataframe_for_working(dataframe_path) >> validate(dataframe_path) >> save(dataframe_path)
@task
def final():
# some final work
pass
def dummy_data():
return [
{'filename': 'file_1.xlsx', 'bucket_name': 'bucket1'},
{'filename': 'file_2.xlsx', 'bucket_name': 'bucket2'},
]
with DAG('SO_process_file', start_date=days_ago(0), schedule=None, default_args={}, catchup=False):
tasks = []
for item in dummy_data():
tsk = process_file(**item)
tasks.append(tsk)
tasks >> final()