So I have task A which is copying some unkown number of files into a folder. Task B runs on each of those files in the folder. I have no way of knowing the number of files beforehand as they keep changing. Is there a way to make this work in airflow.
spans = os.listdir('/home/abc/tmpFolder')
counter = 0
for s in spans:
src_path = '/home/abc/tmpFolder' + s
dst_path = "tmp/" + s
counter += 1
run_this = \
FileToGoogleCloudStorageOperator(
task_id='gcp_task_' + str(counter),
src=src_path,
dst=dst_path,
bucket='gcpBucket',
google_cloud_storage_conn_id='gcp',
mime_type='text/plain',
dag=dag
)
dummy_operator_two.set_downstream(run_this)
I am getting name of all the files in the directory and then running the operator for them, but airflow doesn't work that way as it needs to know the number beforehand.