I have 30 individual tasks in a dag, they have no dependencies between each other. The tasks run the same code. The only difference is the data volume, some tasks will finish in secs, some tasks will take 2 hours or more.
The problem is during catchup, the tasks that finish in secs are blocked by tasks that take hours to finish before they move on to the next execution date.
I can break them up into individual dags but that seems silly and 30 tasks will grow to a bigger number in the future.
Is there any way to run tasks in the same dag at different execution times? Like as soon as a task finish, take on the next execution date, regardless of how other tasks are doing.
Adding pic for illustration. Basically, I'd like to see two more solid green boxes on the first row while the third row is still running behind.
Edit:
After y2k-shubham's explanation, I tried to implement it. But it's still not working. Fast task starts at 2019-01-30 00
, finishes in a sec, and does not start 2019-01-30 01
because the slow task is still running. If possible, it'd be ideal to run 2019-01-30 01
, 2019-01-30 02
, 2019-01-30 03
...in parallel if possible
Adding code example
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working