Apache Airflow is a workflow management platform to programmatically author, schedule, and monitor workflows as directed acyclic graphs (DAGs) of tasks. Use this tag for questions about version 2+ of Airflow. Use the more generic [airflow] tag on all Airflow questions, and only add this one if your question is version-specific.
Questions tagged [airflow-2.x]
690 questions
7
votes
1 answer
MWAA in productions - tasks queued for unknown reasons
Does anyone use MWAA in production?
We currently have around 500 DAGs running and we see an unexpected behavior with tasks staying in a "queued" state for unknown reasons.
Task is in the 'queued' state which is not a valid state for
execution. The…

val
- 329
- 2
- 16
7
votes
0 answers
New DAGs / Changes to DAGs not being picked up by Airflow
Problem :
New DAGs or changes to existing DAGs are not showing up on Airflow web server to use in the App.
For example. Suppose I add a new DAG in the DAG directory:
What does work:
If I run $ airflow dags list then the dag does show up.
Similarly…

robertwest
- 904
- 7
- 13
7
votes
3 answers
Docker compose file for airflow 2 ( version 2.0.0 )
I am looking to write docker compose file to locally execute airflow in production similar environent.
For older airflow v1.10.14, docker compose is working fine. But same docker compose is not working for latest stable version, airflow scheduler &…

avikm
- 511
- 1
- 7
- 23
6
votes
3 answers
how to import custom modules in Cloud Composer
I created a local project with apache Airflow and i want to run it in cloud composer. My project contains custom modules and a main file that calls them.
Example : from src.kuzzle import KuzzleQuery
Structure:
main.py
src
kuzzle.py
I have…

Oussama Fathallah
- 105
- 7
5
votes
1 answer
Connection pooling for external connections in Airflow
I am trying to find a way for connection pool management for external connections created in Airflow.
Airflow version : 2.1.0
Python Version : 3.9.5
Airflow DB : SQLite
External connections created : MySQL and Snowflake
I know there are properties…

Shashank Gupta
- 321
- 3
- 15
5
votes
1 answer
Airflow XCOM communication from BashOperator to SSHOperator
I just began learning Airflow, but it is quite difficult to grasp the concept of Xcom. Therefore I wrote a dag like this:
from airflow import DAG
from airflow.utils.edgemodifier import Label
from datetime import datetime
from datetime import…

3LexW
- 343
- 2
- 18
5
votes
4 answers
How to setup LDAP authentication in Airflow 2.0
I am currently attempting to setup LDAP integration with an existing LDAP server in Airflow. In the past, I have attempted making a cacert (ldap_ca.crt) and have followed this guide and this guide.
When I start up Airflow I am presented with a login…

Dan
- 55
- 1
- 1
- 5
4
votes
2 answers
Airflow S3 connection type is missing
I'm using the versions airflow 2.5.1 and python 3.10. I need to create S3 connection type in Admin>Add connection. But the connection Type for S3 in dropdown is missing.
I already installed the the provider apache-airflow-providers-amazon and also…

Dutt
- 301
- 1
- 9
4
votes
1 answer
Dynamic Task Mapping with Decorators in Airflow 2.3
I want a whole task group to run on the output of a single task, where both task and task group are defined via decorators - @task and @task_group respectively.
Somewhat similar to
For that, I updated one of the examples provided by Airflow. The…

Índio
- 539
- 5
- 12
4
votes
1 answer
My DAG config/params aren't being passed to my task
I am passing some run-time DAG params/config to a PythonOperator in a very similar way to these Airflow docs:
def print_x(x):
print(f"x is {x}")
with DAG(
"print_x",
start_date=pendulum.datetime(2022, 6, 15, tz="UTC"),
…

LondonRob
- 73,083
- 37
- 144
- 201
4
votes
0 answers
How to use Custom built REST API to authenticate airflow webserver login?
I am using Airflow 2.0.1
I have created one REST API as below which takes username and password and returns if user is authenticated or not.
API:
http://localhost:port/api/authenticate
Request Body:
{"username": "user",
"password": "pswd"}
Now, i…

Arshpreet Kaur
- 65
- 4
4
votes
1 answer
Create dynamic tasks depending on the result of an sql query in airflow
I am trying to create dynamic tasks with TaskGroup saving the result in a variable. The variable is modified every N minutes depending on a database query but when the variable is modified the second time the scheduler breaks down
Basically I need…

Francisco
- 1,231
- 1
- 6
- 7
4
votes
2 answers
Airflow Scheduler liveness probe crashing (version 2.0)
I have just upgraded my Airflow from 1.10.13 to 2.0. I am running it in Kubernetes (AKS Azure) with Kubernetes Executor. Unfortunately, I see my Scheduler getting killed every 15-20 mins due to Liveness probe failing. Hence my pod keeps…

stoicky
- 97
- 7
3
votes
1 answer
Setting up Docker image with R and Snowflake Drivers
Requirement: To connect and execute the RScript with connection from R to SnowflakeDB
I am trying to set up a Docker image that can communicate with a Snowflake database through R (either using RODBC or ODBC)
Error:
The problem seems to be that It…

Karthik
- 441
- 5
- 17
3
votes
0 answers
Airflow Dynamic Task Mapping identifiers in UI
With the new Dynamic Task Mapping feature, it's possible to use the expand function and create a variable number of tasks based on the output of a previous task.
The problem is that, as shown in the documentation, the UI will only show the total…

Nicolò Gasparini
- 2,228
- 2
- 24
- 53