Questions tagged [airflow-taskflow]
67 questions
6
votes
2 answers
How do you access Airflow variables with task decorators using jinja templating?
I'm currently accessing an Airflow variable as follows:
from airflow.models import Variable
s3_bucket = Variable.get('bucket_name')
It works but I'm being asked to not use the Variable module and use jinja templating instead (i.e.):
s3_bucket =…

Paul Chung
- 83
- 2
- 5
6
votes
1 answer
Airflow: How to pass data from a decorated task to SimpleHttpOperator?
I recently started using Apache airflow. In am using Taskflow API with one decorated task with id Get_payload and SimpleHttpOperator. Task Get_payload gets data from database, does some data manipulation and returns a dict as payload.
Probelm
Unable…

Dheemanth Bhat
- 4,269
- 2
- 21
- 40
5
votes
2 answers
How to write unittest for @task decorated Airflow tasks?
I am trying to write unittests for some of the tasks built with Airflow TaskFlow API. I tried multiple approaches for example, by creating a dagrun or only running the task function but nothing is helping.
Here is a task where I download a file from…

Sadan A.
- 1,017
- 1
- 10
- 28
4
votes
1 answer
Airflow: Importing decorated Task vs all tasks in a single DAG file?
I recently started using Apache Airflow and one of its new concept Taskflow API. I have a DAG with multiple decorated tasks where each task has 50+ lines of code. So I decided to move each task into a separate file.
After referring stackoverflow I…

Dheemanth Bhat
- 4,269
- 2
- 21
- 40
4
votes
1 answer
Airflow: Problem with creating dynamic task within TaskGroup
I'm trying to make a dynamic workflow.
I got this broken DAG error duplicate task id
Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
File…

Tegar D Pratama
- 167
- 1
- 4
- 16
4
votes
1 answer
Airflow taskflow - run task in parallele
Wanted to try the new taskflow API I came to the point where I need to have 2 parallels task.
With Airflow v1 I was use to do something like
task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4
The way we call the task is different now for…

Ragnar
- 2,550
- 6
- 36
- 70
2
votes
0 answers
Airflow - How to handle tasks relationship for the tasks inside dynamic task group mapping
I have implemented dynamic task group mapping with a Python operator and a deferrable operator inside the task group.
I got stuck with controlling the relationship between mapped instance value passed during runtime i.e when the deferrable operator…

saravana kumar
- 255
- 1
- 3
- 10
2
votes
1 answer
How can I create a shared child between two tasks using TaskFlow Api?
My code look like this:
def etl():
for item in ['FIRST','SECCOND','THIRD']:
if item == 'a':
requests = ['Data1','Data3']
elif item == 'b':
requests = ['Data1']
for data_name in requests:
…

mrc
- 2,845
- 8
- 39
- 73
1
vote
0 answers
Different set of task from the user at runtime to airflow dag
User provides input to dag from UI through "Trigger DAG w/ config" , which are the name of taskflow to be run. At each dag run, function name(taskflow) may change from a set of function that are supported.
At "Trigger DAG w/ config"
1st run
Input…

MadhyasN
- 21
- 4
1
vote
0 answers
Cannot find DAG on airflow dags list
enter image description here
It's the first time I tried to create airflow dag but when I checked the dag lists in terminal, there is no dag_id that I created
Is there anything wrong with my file or the directory of the dag the I created

Rattapong Pojpatinya
- 11
- 1
1
vote
0 answers
Apache Airflow Docker Swarm Operator tasks stuck in running
I'm using Airflow for running tasks in docker swarm using DockerSwarmOperator. After DockerSwarmOperator task gets completed, the task remains running in Airflow while the corresponding container has been exited. Here is my docker-compose…

Ata Attarian
- 11
- 1
1
vote
0 answers
Apache Airflow: Clean a task with a config
I'm using Apache Airflow 2, I can re-run the whole dag using a config:
However sometimes I want to re-run only one task and not the whole dag with a config also, for now I can clear the task, but I'm not able to add a config, the task I want to…

svg_af_2
- 75
- 5
1
vote
0 answers
Using Airflow, How to handle asynchronous API calls with Airflow Tasks?
I'm quite new to Airflow, I need to make asynchronous POST API calls to start the execution of the external service. And Need to make GET API calls to check the status of the execution and have to make that call until the execution gets completed.
I…

saravana kumar
- 255
- 1
- 3
- 10
1
vote
0 answers
Airflow retries long running task multiple times while the task is still running
I have a long running task where it loops through calling some REST endpoint to get data maybe hundreds of times and could take up to 1 hour.
While the task is still running, I see there are multiple attempts or retries, even though I specifically…

nismoh
- 53
- 1
- 7
1
vote
1 answer
Branching in Apache Airflow using TaskFlowAPI
I can't find the documentation for branching in Airflow's TaskFlowAPI. I tried doing it the "Pythonic" way, but when ran, the DAG does not see task_2_execute_if_true, regardless of truth value returned by the previous task.
@dag(
…

matwasilewski
- 384
- 2
- 11