0

I'm trying to figure out how to process files from S3. Below some my ideas and questions. Or maybe you could share your experience. Main difficulties linked to passing a file downloaded from S3 and pandas dataframe from one task to another - they are serializible and I cannot user xcom. And the only way I could find - save them on host filesystem and pass file paths from one task to another.

import pandas as pd
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.utils.dates import days_ago


@task
def download_from_s3(filename: str, bucket_name: str):
    # as far as I see there is no way to pass bytes from one task to another
    # and the only way is to store file in file system and return path to it?
    # file size is about 100 Mb
    return '???'


@task
def read_file_into_dataframe(file_path: str):
    # dataframe can be huge - about 1 Gb and more
    df = pd.read_excel(file_path)
    # save dataframe into file, f.e. into HDF5, Parquet and return file path to it?

    return '???'


@task
def prepare_dataframe_for_working(dataframe_path: str):
    # set index, remove some blank rows and columns
    df = pd.read_hdf(dataframe_path)
    # do some processing
    return dataframe_path


def _validate(df: pd.DataFrame):
    return


@task
def validate(dataframe_path: str):
    # do some validations
    # if there is no validation errors found save data from dataframe into Postgres
    df = pd.read_hdf(dataframe_path)
    validation_errors = _validate(df)

    if validation_errors:
        # save validation errors
        return True

    return False


@task
def save(dataframe_path: str):
    df = pd.read_hdf(dataframe_path)
    # covert for saving into postgres


@task_group
def process_file(filename: str, bucket_name: str):

    file_path = download_from_s3(filename, bucket_name)
    dataframe_path = read_file_into_dataframe(file_path)

    dataframe_path >> prepare_dataframe_for_working(dataframe_path) >> validate(dataframe_path) >> save(dataframe_path)


@task
def final():
    # some final work
    pass


def dummy_data():
    return [
        {'filename': 'file_1.xlsx', 'bucket_name': 'bucket1'},
        {'filename': 'file_2.xlsx', 'bucket_name': 'bucket2'},
    ]


with DAG('SO_process_file', start_date=days_ago(0), schedule=None, default_args={}, catchup=False):
    tasks = []

    for item in dummy_data():
        tsk = process_file(**item)
        tasks.append(tsk)

    tasks >> final()

1 Answers1

1

Pandas allows you to read data directly from S3 without writing it to disk. This discussion Link should help you.