28

With the help of this Stackoverflow post I just made a program (the one shown in the post) where when a file is placed inside an S3 bucket a task in one of my running DAGs is triggered and then I perform some work using the BashOperator. Once it's done though the DAG is no longer in a running state but instead goes into a success state and if I want to have it pick up another file I need to clear all the 'Past', 'Future', 'Upstream', 'Downstream' activity. I would like to make this program so that it's always running and anytime a new file is placed inside the S3 bucket the program kicks off the tasks.

Can I continue using the S3KeySenor to do this or do I need to figure out a way of setting up an External Trigger to run my DAG? As of now my S3KeySensor is pretty pointless if it's only going to ever run once.

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

I'm wondering if this is not possible because it then wouldn't be a Directed Acyclic Graph but rather it would have a loop that repeated sensor -> t1 -> t2 -> sensor -> t1 -> t2 -> sensor -> ... keep repeating.

Update:

My use case is pretty simple, anytime a new file is placed inside a designated AWS S3 Bucket I want my DAG to be triggered and start my process of various tasks. The tasks will do things like instantiate a new AWS EMR Cluster, extract the files from the AWS S3 Bucket, perform some AWS EMR Activities, then shut down the AWS EMR Cluster. From there the DAG would go back into a waiting state where it would wait for new files to arrive in the AWS S3 Bucket and then repeat the process indefinitely.

Kyle Bridenstine
  • 6,055
  • 11
  • 62
  • 100
  • 1
    I posted an answer below that makes some assumptions around the use case. Let me know if anything is unclear or if I've misunderstood what you'd like to achieve. – Taylor D. Edmiston May 29 '18 at 20:39
  • @TaylorEdmiston I think you understood pretty well what I was trying to achieve but I also updated the post to include my use case. Thanks. – Kyle Bridenstine May 29 '18 at 20:48
  • 1
    This makes sense. I think the very frequently running DAG might be simpler to get started, but the externally triggered DAG runs sounds like a nicer, more flexible setup for this use case. – Taylor D. Edmiston May 29 '18 at 21:02
  • For those who are wondering why it's not working for them: Imports are outdated/deprecated. Try this 1. `pip install apache-airflow-providers-amazon` 2. `from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor` – Darshan Oct 09 '21 at 13:51

2 Answers2

12

Within Airflow, there isn't a concept that maps to an always running DAG. You could have a DAG run very frequently like every 1 to 5 minutes if that suits your use case.

The main thing here is that the S3KeySensor checks until it detects that the first file exists in the key's wildcard path (or timeout), then it runs. But when a second, or third, or fourth file lands, the S3 sensor will have already completed running for that DAG run. It won't get scheduled to run again until the next DAG run. (The looping idea you described is roughly equivalent to what the scheduler does when it creates DAG runs except not forever.)

An external trigger definitely sounds like the best approach for your use case, whether that trigger comes via the Airflow CLI's trigger_dag command ($ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

Or via the REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

Both turn around and call the trigger_dag function in the common (experimental) API:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

You could, for instance, setup an AWS Lambda function, called when a file lands on S3, that runs the trigger DAG call.

Taylor D. Edmiston
  • 12,088
  • 6
  • 56
  • 76
  • 4
    Okay thanks. That's sort of what I expected the answer would be. I could easily setup a Lambda function to make the REST API call like you said; this is also what I would have to do if I went with AWS Data Pipeline, I'd have to activate it each time using a Lambda function. – Kyle Bridenstine May 29 '18 at 20:43
  • 1
    is this approach still the same in 2020 or is there any better way to deal with this now? – Dev Aug 25 '20 at 06:38
  • @Devender yes, the approach is still the same currently. i don't believe there are any plans to fundamentally change it. – Taylor D. Edmiston Aug 28 '20 at 14:38
  • @TaylorEdmiston I have a DAG schedule to run every minute, and the first task is an S3KeySensor which timeouts in 59 seconds. I would expect that a new instance of the DAG, so a new instance of the sensor task, would run every minute, but it's running only once... should this work? what could I being doing wrong? – ajendrex Oct 30 '20 at 12:43
  • @ajendrex Hi. Yes, the approach you described should work when running the DAG normally. If you are calling via `trigger_dag`, that only creates one DAG run irrespective of the DAG's schedule. – Taylor D. Edmiston Oct 30 '20 at 20:32
1

Another way is to use the S3 trigger an aws lambda which will invoke the DAG using api

s3 event -> aws lambda -> Airflow api

Setup S3 notification to trigger lambda

https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

Airflow API

https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html

Muthu
  • 58
  • 6
  • From Review: A link to a solution is welcome, but please ensure your answer is useful without it: [add context around the link](https://meta.stackexchange.com/a/8259) so your fellow users will have some idea what it is and why it’s there, then quote the most relevant part of the page you're linking to in case the target page is unavailable. Answers that are little more than a link [may be deleted](https://stackoverflow.com/help/deleted-answers). See: [How do I write a good answer?](https://stackoverflow.com/help/how-to-answer) – sɐunıɔןɐqɐp Dec 30 '20 at 15:00