Code below so far
t1 = S3ListOperator(
task_id='list_s3_files',
bucket='mybucket',
prefix='v01/{{ds}}/',
delimiter='/'
)
will then copy the latest file across using S3CopyObjectOperator
Code below so far
t1 = S3ListOperator(
task_id='list_s3_files',
bucket='mybucket',
prefix='v01/{{ds}}/',
delimiter='/'
)
will then copy the latest file across using S3CopyObjectOperator
Not a particular "Airflow way", but you could do this with a PythonOperator
:
all_objects = boto3.resource('s3').bucket(your_bucket_name).objects.iterator()
sorted_objs = sorted(all_objects, key=lambda o: o.last_modified)
latest_file = sorted_objs[-1]
Though it's not an "industrial solution", as it requires pulling all the files just to sort them. S3 doesn't support "querying" like that.
If you have a predictable way to segment the files (e.g per-day, per-hour), it wouldn't be that bad though.