0

I have multiple source sending incremental data and there are no metadata columns at record level. How can I ensure that Airflow is processing data in the order of receipt. I may end-up processing the file in out-of-sync order.

Does airflow have inbuilt methods/way to handle the files in the order of receival. ?

Airflow version used :2.4.3

rajdallas
  • 65
  • 1
  • 4
  • How are you accessing the data from within Airflow? Is it stored somewhere between your sources and being used in your pipelines? I'm trying to understand a bit more about your architecture. – TJaniF Jan 27 '23 at 13:03
  • S3 is my source and target is a database. – rajdallas Jan 27 '23 at 21:31

1 Answers1

0

You can use boto to retrieve the last modified timestamp from files in your S3 bucket within a PythonOperator.

This question has an answer that shows how to pull the last modified timestamp. Then you can sort the keys by the timestamp, process the files in that order and move the files to an achieve folder or bucket so only new files are processed with every DAG run.

As a general note if you have any control over your sources I would prefer trying to add a timestamp at the record level, this seems like an easier option.

TJaniF
  • 791
  • 2
  • 7