Note
- Please read and understand the question thoroughly
- It cannot be solved by simple
BranchPythonOperator
/ShortCircuitOperator
We have an unusual multiplexer-like use-case in our workflow
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
The flow is expected to work as follows
MUX-task
listens for events on an external queue (single queue)- each event on queue triggers execution of one of the branches (branch-n.begin-task)
- one-by-one, as events arrive, the MUX-task must trigger execution of respective branch
- once all branches have been triggered, the MUX-task completes
Assumptions
- Exactly
n
events arrive on queue, one for triggering each branch n
is dynamically-known: it's value is defined in aVariable
Limitations
- The external queue where events arrive is only one
- we can't have
n
queues (one per branch) since branches grow with time (n is dynamically defined)
We are not able to come up with a solution within Airflow's set of operators and sensors (or any such thing available out-of-the-hood in Airflow
) to build this
Sensor
s can be used for listening events on external queue; but we have to listen for multiple events, not oneBranchPythonOperator
can be used to trigger execution of a single branch out of many, but it immediately marks remaining branches as skipped
Primary bottleneck
Because of the 2nd limitation above, even a custom-operator combining functionality of a Sensor
and BranchPythonOperator
won't work.
We have tried to brainstorm around a fancy combination of Sensors
, DummyOperator
and trigger_rules
to achieve this, but have had no success thus far.
Is this doable in Airflow?
UPDATE-1
Here's some background info to understand the context of workflow
- we have an ETL pipeline to sync
MySQL
tables (across multipleAurora
databases) to our data-lake - to overcome the impact of our sync pipeline on production databases, we have decided to do this
- for each database, create a snapshot (restore
AuroraDB
cluster from last backup) - run
MySQL
sync pipeline using that snapshot - at then end of sync, terminate the snapshot (
AuroraDB
cluster)
- for each database, create a snapshot (restore
- the snapshot lifecycle events of
Aurora
snapshot restore process are published to anSQS
queue- single queue for all databases
- this setup was done by our DevOps team (different AWS account, we don't have access to the underlying
Lambda
s /SQS
/ infra)