3

I love the idea of airflow but I'm stuck in the basics. Since yesterday I have airflow running on a vm ubuntu-postgres solution. I can see the dashboard and the example data :)) What I want now is to migrate an example script which I use to process raw to prepared data.

Imagine u have a folder of csv files. Today my script iterates through it, passing each file to a list which is going to be converted into a df. After that I prepare their columns names and do some data cleaning and write it into a different format.

1: pd.read_csv for files in directory

2: create a df

3: clean column names

4: clean values (parallel to stp 3)

5: write the result to a database

How would I have to organize my files according to airflow? How should the script look like? Am I passing a single method, a single file or do I have to create several files for each part? I'm lacking the basic concept at this point :( Everything I read about airflow is way more complex than my simple case. I was considering to step away from airflow as well to Bonobo, Mara, Luigi, but I think airflow is worth it?!

Christian
  • 515
  • 1
  • 6
  • 17

2 Answers2

5

I'd use the PythonOperator, put the whole code into a Python function, create one Airflow task and that's it.

It would also be possible to put the loading of the csv files in a function and the database writing as well, if it is neccessary to split those steps. All this would be put in one single DAG.

So your one DAG would have three tasks like:

loadCSV (PythonOperator)
parseDF (PythonOperator)
pushToDB (PythonOperator)

If you use several tasks you need to use Airflow's XCom. In the beginning it is easier to just use one task.

There are several code examples here under the tag airflow. When you have created something, ask again.

tobi6
  • 8,033
  • 6
  • 26
  • 41
  • Just as an added note, here is another S.O. thread, which showcases something similar: https://stackoverflow.com/questions/57861233/using-the-output-of-one-python-task-and-using-as-the-input-to-another-python-tas – d_- May 19 '20 at 09:23
  • Compared with exchanging data via an external `.csv` file, the vineyard XCom backend enables more efficient zero-copy data sharing between tasks in a DAG, see also: https://v6d.io/notes/airflow.html – sighingnow Sep 08 '21 at 05:47
5

For people who still stuck on this issue, we have recently implemented a custom XCom backend for airflow, backed by vineyard, to support such kind of cases.

The provider is opensource there: https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow

With the Vineyard XCom backend, users could have dag that produces and consumes pandas.DataFrame directly, without any "to_csv" + "from_csv" hacks,

import numpy as np
import pandas as pd

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
    @task()
    def extract():
        order_data_dict = pd.DataFrame({
            'a': np.random.rand(100000),
            'b': np.random.rand(100000),
        })
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_order_value": order_data_dict["a"].sum()}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

taskflow_etl_pandas_dag = taskflow_etl_pandas()

Hope that helps in your cases.

sighingnow
  • 791
  • 5
  • 11