I have a dag called my_dag.py
that utilizes the S3KeySensor in Airflow 2 to check if a s3 key exists. When I use the sensor directly inside the dag, it works:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
poke_interval = 30
timeout = 60*60
mode = 'reschedule'
dependency_name = 'my_file'
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
The log of the above looks like:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
This is correct. The reschedule is expected, because the file does not exist yet.
However, I want to check any number of paths in other dags, so I moved the sensor into a function called test
in another file called helpers.py
. I use a python operator in my_dag.py
within the task group that calls test
. It looks like this:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
wait_for_dependencies = PythonOperator(
task_id = 'wait_for_my_file',
python_callable = test,
op_kwargs = {
'dependency_name': dependency_name,
'path': path
},
dag = dag
)
wait_for_dependencies
The function test
in helpers.py
looks like:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
However, when I run the dag, the step is marked as success even though the file is not there. The logs show:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
It seems airflow doesn't like using a sensor via a python operator. Is this true? Or am I doing something wrong?
My goal is to loop through multiple paths and check if each one exists. However, I do this in other dags, which is why I'm putting the sensor in a function that resides in another file.
If there are alternative ideas to doing this, I'm open!
Thanks for your help!