I am trying to copy files that I receive hourly into my incoming bucket with the below format
s3://input-bucket/source_system1/prod/2022-09-27-00/input_folder/filename1.csv
s3://input-bucket/source_system1/prod/2022-09-27-00/input_folder/filename2.csv
s3://input-bucket/source_system1/prod/2022-09-27-01/input_folder/filename3.csv
s3://input-bucket/source_system1/prod/2022-09-27-11/input_folder/filename3.csv
I want to copy the objects into a destination folder with a single airflow task for a specific source system. I tried -
s3_copy = S3CopyObjectOperator(
task_id=f"copy_s3_objects_{TC_ENV.lower()}",
source_bucket_key="s3://input-bucket/source_system1/prod/2022-09-27-*",
dest_bucket_name="destination-bucket",
dest_bucket_key=f"producers/prod/event_type=source_system/execution_date={EXECUTION_DATE}",
aws_conn_id=None
)
The problem with the above is, I am not able to use wildcards
for the input source_bucket. It needs to be a specific complete prefix of the s3 object. I also tried using the combination of S3ListOperator
and S3FileTransformOperator
. But all of them created a single task for each object. But I need 1 airflow task for 1 source system thus able to copy all the data with this wildcard pattern-
s3://input-bucket/source_system1/prod/2022-09-27-*
How can I achieve this?