Questions tagged [airflow-taskflow]

67 questions
6
votes
2 answers

How do you access Airflow variables with task decorators using jinja templating?

I'm currently accessing an Airflow variable as follows: from airflow.models import Variable s3_bucket = Variable.get('bucket_name') It works but I'm being asked to not use the Variable module and use jinja templating instead (i.e.): s3_bucket =…
Paul Chung
  • 83
  • 2
  • 5
6
votes
1 answer

Airflow: How to pass data from a decorated task to SimpleHttpOperator?

I recently started using Apache airflow. In am using Taskflow API with one decorated task with id Get_payload and SimpleHttpOperator. Task Get_payload gets data from database, does some data manipulation and returns a dict as payload. Probelm Unable…
Dheemanth Bhat
  • 4,269
  • 2
  • 21
  • 40
5
votes
2 answers

How to write unittest for @task decorated Airflow tasks?

I am trying to write unittests for some of the tasks built with Airflow TaskFlow API. I tried multiple approaches for example, by creating a dagrun or only running the task function but nothing is helping. Here is a task where I download a file from…
Sadan A.
  • 1,017
  • 1
  • 10
  • 28
4
votes
1 answer

Airflow: Importing decorated Task vs all tasks in a single DAG file?

I recently started using Apache Airflow and one of its new concept Taskflow API. I have a DAG with multiple decorated tasks where each task has 50+ lines of code. So I decided to move each task into a separate file. After referring stackoverflow I…
Dheemanth Bhat
  • 4,269
  • 2
  • 21
  • 40
4
votes
1 answer

Airflow: Problem with creating dynamic task within TaskGroup

I'm trying to make a dynamic workflow. I got this broken DAG error duplicate task id Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last): File…
4
votes
1 answer

Airflow taskflow - run task in parallele

Wanted to try the new taskflow API I came to the point where I need to have 2 parallels task. With Airflow v1 I was use to do something like task_1 >> [task_2, task_3] [task_2, task_3] >> task_4 The way we call the task is different now for…
Ragnar
  • 2,550
  • 6
  • 36
  • 70
2
votes
0 answers

Airflow - How to handle tasks relationship for the tasks inside dynamic task group mapping

I have implemented dynamic task group mapping with a Python operator and a deferrable operator inside the task group. I got stuck with controlling the relationship between mapped instance value passed during runtime i.e when the deferrable operator…
saravana kumar
  • 255
  • 1
  • 3
  • 10
2
votes
1 answer

How can I create a shared child between two tasks using TaskFlow Api?

My code look like this: def etl(): for item in ['FIRST','SECCOND','THIRD']: if item == 'a': requests = ['Data1','Data3'] elif item == 'b': requests = ['Data1'] for data_name in requests: …
mrc
  • 2,845
  • 8
  • 39
  • 73
1
vote
0 answers

Different set of task from the user at runtime to airflow dag

User provides input to dag from UI through "Trigger DAG w/ config" , which are the name of taskflow to be run. At each dag run, function name(taskflow) may change from a set of function that are supported. At "Trigger DAG w/ config" 1st run Input…
MadhyasN
  • 21
  • 4
1
vote
0 answers

Cannot find DAG on airflow dags list

enter image description here It's the first time I tried to create airflow dag but when I checked the dag lists in terminal, there is no dag_id that I created Is there anything wrong with my file or the directory of the dag the I created
1
vote
0 answers

Apache Airflow Docker Swarm Operator tasks stuck in running

I'm using Airflow for running tasks in docker swarm using DockerSwarmOperator. After DockerSwarmOperator task gets completed, the task remains running in Airflow while the corresponding container has been exited. Here is my docker-compose…
1
vote
0 answers

Apache Airflow: Clean a task with a config

I'm using Apache Airflow 2, I can re-run the whole dag using a config: However sometimes I want to re-run only one task and not the whole dag with a config also, for now I can clear the task, but I'm not able to add a config, the task I want to…
svg_af_2
  • 75
  • 5
1
vote
0 answers

Using Airflow, How to handle asynchronous API calls with Airflow Tasks?

I'm quite new to Airflow, I need to make asynchronous POST API calls to start the execution of the external service. And Need to make GET API calls to check the status of the execution and have to make that call until the execution gets completed. I…
1
vote
0 answers

Airflow retries long running task multiple times while the task is still running

I have a long running task where it loops through calling some REST endpoint to get data maybe hundreds of times and could take up to 1 hour. While the task is still running, I see there are multiple attempts or retries, even though I specifically…
nismoh
  • 53
  • 1
  • 7
1
vote
1 answer

Branching in Apache Airflow using TaskFlowAPI

I can't find the documentation for branching in Airflow's TaskFlowAPI. I tried doing it the "Pythonic" way, but when ran, the DAG does not see task_2_execute_if_true, regardless of truth value returned by the previous task. @dag( …
matwasilewski
  • 384
  • 2
  • 11
1
2 3 4 5