2

I have a series of Airflow tasks that require some fan-out and collection of results for bulk processing, and I'm having a hard time visualizing how it should work.

Roughly, I fetch a list of files, process them individually through a series of transformation tasks, then load them into a database.

Fetch task(s) overview

  1. Fetch list of JSON files to download
  2. Download each JSON file
  3. For each file begin "processing workflow"

"Processing workflow" task(s) overview

  1. Parse JSON file
  2. Reshape JSON data
  3. Run suite of (stateless) error correction functions on reshaped JSON data
  4. Insert JSON data into database
  5. Run suite DB-level functions on that just-inserted data
  6. Run more DB-level functions on data from step 5

It's unclear how to, for example, begin all of the "processing workflow" tasks for each file from within a single task. Should bulk tasks like this each be a sub-DAG of tasks? How would you model this?

Carson
  • 17,073
  • 19
  • 66
  • 87

2 Answers2

3

I have tackled problems like this in Airflow using 2 DAGs.

DAG1: Fetch list of JSON files to download (store locally in a simple environment, or put in NFS or common mount point in a more complex environment)

DAG2: Loop over list of files to create tasks specific to each file

  • Download JSON file
  • Parse JSON file
  • Reshape JSON data
  • Run suite of (stateless) error correction functions on reshaped JSON data
  • Insert JSON data into database
  • Run suite DB-level functions on that just-inserted data
  • Run more DB-level functions on data from step 5

Here's an incomplete snippet showing how to loop over a csv file and generate the Download and Parse JSON files as BashOperator tasks.

dlJSON = {}
parseJSON = {}
    
all_tasks = DummyOperator(task_id='all_tasks', dag=dag)
                                                        
with open(file_directory + metadata) as csvfile:
    reader = csv.DictReader(csvfile)
    rows = [_ for _ in reader if _]  # remove empty strings
        
    for row in rows:
            
        dlJSON[('dlJSON_{}'.format(row['file']) ] = BashOperator(
            task_id=('dlJSON_{}'.format(row['file'])),
            bash_command=templated_download,
            xcom_push=True,
            params={
                'file': row['file'],
                'directory': file_directory,
                'outfile': '{}.json'.format(row['file']),
            },
            dag=dag,
        )
    
        parseJSON[('parseJSON_{}'.format(row['file']) ] = BashOperator(
            task_id=('parseJSON_{}'.format(row['file'])),
            bash_command=templated_parse,
            xcom_push=True,
            params={
                'file': row['file'],
                'directory': file_directory,
                'infile': '{}.json'.format(row['file']),
            },
            dag=dag,
        )
                
        'dlJSON_{}'.format(row['file']) >> 'parseJSON_{}'.format(row['file']) >> all_tasks
Nuno André
  • 4,739
  • 1
  • 33
  • 46
Damon Cool
  • 121
  • 1
  • 3
0

This feature is is in active development and is tentatively scheduled for Airflow 2.3 (Q1 2022). See the Airflow Improvement Proposal 42 (Dynamic Task Mapping) here:

Relevant links here:

Prototype code from the AIP showing how to cleanly delete through a dynamic list of files in s3. Note the use of partial (to partially use some operator arguments) and map functions:

from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
 
@task
def get_files_from_s3():
    # S3 list operation
    ...
 
my_files = get_files_from_s3()
s3_delete_files = S3DeleteObjectsOperator.partial(
   aws_conn_id="my-aws-conn-id",
   bucket="my-bucket"
).map(key=my_files)

Copied from my other post (Proper way to create dynamic workflows in Airflow):

Gabe
  • 5,113
  • 11
  • 55
  • 88