0

I am trying to copy files that I receive hourly into my incoming bucket with the below format

s3://input-bucket/source_system1/prod/2022-09-27-00/input_folder/filename1.csv
s3://input-bucket/source_system1/prod/2022-09-27-00/input_folder/filename2.csv
s3://input-bucket/source_system1/prod/2022-09-27-01/input_folder/filename3.csv
s3://input-bucket/source_system1/prod/2022-09-27-11/input_folder/filename3.csv

I want to copy the objects into a destination folder with a single airflow task for a specific source system. I tried -

   s3_copy = S3CopyObjectOperator(
                task_id=f"copy_s3_objects_{TC_ENV.lower()}",
                source_bucket_key="s3://input-bucket/source_system1/prod/2022-09-27-*",
                dest_bucket_name="destination-bucket",
                dest_bucket_key=f"producers/prod/event_type=source_system/execution_date={EXECUTION_DATE}",
                aws_conn_id=None
                )

The problem with the above is, I am not able to use wildcards for the input source_bucket. It needs to be a specific complete prefix of the s3 object. I also tried using the combination of S3ListOperator and S3FileTransformOperator. But all of them created a single task for each object. But I need 1 airflow task for 1 source system thus able to copy all the data with this wildcard pattern-

s3://input-bucket/source_system1/prod/2022-09-27-*

How can I achieve this?

Sleep Deprived Bulbasaur
  • 2,368
  • 4
  • 21
  • 33
Gladiator
  • 354
  • 3
  • 19
  • Looks like I must use boto3 to achieve this. https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3 – Gladiator Sep 28 '22 at 08:57
  • did you find a solution to this using native airflow operators? I can create a script but I am surprised there is nothing available within airflow that can do this – eljusticiero67 Dec 27 '22 at 22:08
  • @eljusticiero67 I was unable to do it with a readily available airflow operator. But did a logic that iterates through the folder one by one from a list and then copy all files using the boto3 paginator logic. Similar to this one https://stackoverflow.com/questions/47468148/how-to-copy-s3-object-from-one-bucket-to-another-using-python-boto3 – Gladiator Dec 28 '22 at 14:54
  • thanks! i was able to use that and add that into a `PythonOperator` – eljusticiero67 Dec 28 '22 at 19:10

1 Answers1

1

If you wish to achieve this in one specific task I recommend utilizing the PythonOperator to interact with the S3Hook as follows:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime

def s3_copy(**kwargs):
    hook = S3Hook(aws_conn_id='aws_default')
    source_bucket = Variable.get('source_bucket')
    keys = hook.list_keys(bucket_name=source_bucket, prefix='')
    for key in keys:
        hook.copy_object(
            source_bucket_name=source_bucket,
            dest_bucket_name=Variable.get('dest_bucket'),
            source_bucket_key=key,
            dest_bucket_key=key,
            acl_policy='bucket-owner-full-control'
        )
    pass

with DAG('example_dag',
        schedule_interval='0 1 * * *',
        start_date=datetime(2023, 1, 1),
        catchup=False
    ):

    e0 = EmptyOperator(task_id='start')

    t1 = PythonOperator(
        task_id='example_copy',
        python_callable=s3_copy
    )

    e0 >> t1

You could make improvements on the base logic to be more performant or do some filtering, etc

Sleep Deprived Bulbasaur
  • 2,368
  • 4
  • 21
  • 33