2

So I have task A which is copying some unkown number of files into a folder. Task B runs on each of those files in the folder. I have no way of knowing the number of files beforehand as they keep changing. Is there a way to make this work in airflow.

spans = os.listdir('/home/abc/tmpFolder')
counter = 0
for s in spans:
    src_path = '/home/abc/tmpFolder' + s
    dst_path = "tmp/" + s
    counter += 1
    run_this = \
        FileToGoogleCloudStorageOperator(
            task_id='gcp_task_' + str(counter),
            src=src_path,
            dst=dst_path,
            bucket='gcpBucket',
            google_cloud_storage_conn_id='gcp',
            mime_type='text/plain',
            dag=dag
        )
    dummy_operator_two.set_downstream(run_this)

I am getting name of all the files in the directory and then running the operator for them, but airflow doesn't work that way as it needs to know the number beforehand.

John Constantine
  • 411
  • 1
  • 6
  • 15
  • what error are you getting? – pacuna Sep 11 '18 at 02:44
  • No, it doesn't need to know beforehand number of tasks. It just needs web server to be restarted before scheduling/running the dag to load the new tasks and yes each task should have unique task_id. – mad_ Sep 11 '18 at 14:47

4 Answers4

1

I don't expect Airflow to modify a DAG while DagRun is active, so I wouldn't bet money on getting files and then appending tasks in the same DAG. That being said, Airflow regenerates DAGs every few seconds. You could have one DAG that gets the files and another DAG that processes those files. After getting the files, first DAG would have to wait a minute to make sure Airflow noticed and then kick off the second DAG with a TriggerDagRunOperator.

DAG1:

def wait_a_minute():
    time.sleep(60)

get_files = DummyOperator(dag=dag, task_id='get_files')
give_airflow_time_to_rebuild_DAG2 = PythonOperator(dag=dag, task_id='give_airflow_time_to_rebuild_DAG2', python_callable=wait_a_minute)
trigger_DAG2 = TriggerDagRunOperator(dag=dag, task_id='trigger_DAG2', trigger_dag_id='DAG2', execution_date='{{ ds }}')

get_files >> give_airflow_time_to_rebuild_DAG2 >> trigger_DAG2

DAG2:

pre_process = DummyOperator(dag=dag, task_id='pre_process')
post_process = DummyOperator(dag=dag, task_id='post_process')

files = get_files_to_process()

for file in files:
    process = DummyOperator(dag=dag, task_id=f'process_{file}')
    pre_process >> process >> post_process

More hack than a solution, but something like this should work. There are issues with external triggers and dynamic tasks though. I typically stumble into scheduler problems when I have to use depends_on_past=True.

Justinas Marozas
  • 2,482
  • 1
  • 17
  • 37
0

I've got this type of thing to work by making separate pipelines instead of separate tasks.

Damon Cool
  • 121
  • 1
  • 3
0

Have you tried using glob module and modifying your pipeline to to process all files in given directory?

0

Regarding my blog post on Creating a Dynamic Workflow using Apache Airflow, you can test the following code:

import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from datetime import datetime,timedelta
from os import listdir

default_args = {
  'owner': 'test',
  'depends_on_past': False,
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 1,
  'retry_delay': timedelta(minutes=5)
}

dag = DAG('dynamic',
  default_args=default_args,
  description='Dynamic DAG',
  schedule_interval=timedelta(days=1))

copy_files = BashOperator(task_id='cp_files',
   depends_on_past=False,
   bash_command='cp /tmp/filetocopy/* /tmp/filetoprocess',
   dag=dag)

start = DummyOperator(task_id='start',
                     dag=dag)
end = DummyOperator(task_id='end',
                   dag=dag)

start >> copy_files

spans = listdir('/tmp/filetoprocess')
counter = 1

for s in spans:
  src_path = '/tmp/filetoprocess/' + s
  dst_path = "/tmp/dest/" + s
  counter += 1
  task = FileToGoogleCloudStorageOperator(
            task_id='gcp_task_' + str(counter),
            src=src_path,
            dst=dst_path,
            bucket='gcpBucket',
            google_cloud_storage_conn_id='gcp',
            mime_type='text/plain',
            dag=dag)
  task.set_upstream(copy_files)
  task.set_downstream(end)

With this code, you need to already have some files (you can also create a Python function which check if there is some files, otherwise create a DummyOperator just to have the entire workflow working) in your /tmp/filetoprocess folder ; otherwise, the Airflow scheduler will have issue to generate a proper DAG.

I have test it with the new Apache Airflow release (v.1.10) and it seems to work perfectly.

Dynamic tasks on Airflow DAG