2

Let's say I have list with 100 items called mylist.

I have function that performs certain operation with each element of the list. I order to speed things up I want define n parallel tasks. Each task should take 100/n list items and process them.

I understand all about executors and core settings which I need to change to enable parallelism, I need some basic pointer on how to set tasks right.

My idea is to build it like this:

imports...

mylist = [item0, item2,...,item99]
n=5


def myfunction(sub_list):
    """This is a function that will run within the DAG execution"""
    """Procesing list elements"""


# Generate 5 tasks
for i in range(1,len(lst), n):
    task = PythonOperator(
        task_id='myfunction_' + str(i),
        python_callable=myfunction,
        op_kwargs={'sub_list': mylist[i:i + n]},
        dag=dag,
    )

 task

I've assebmled this algorithm according to documentation. Is this proper way of doing this?

Hrvoje
  • 13,566
  • 7
  • 90
  • 104
  • 1
    Does this answer your question? [Airflow Generate Dynamic Tasks in Single DAG , Task N+1 is Dependent on TaskN](https://stackoverflow.com/questions/52558018/airflow-generate-dynamic-tasks-in-single-dag-task-n1-is-dependent-on-taskn) – SergiyKolesnikov Mar 31 '21 at 16:00
  • @SergiyKolesnikov tasks should run in parallel, in that example they are running sequentially. – Hrvoje Apr 01 '21 at 00:03
  • Just scroll down https://stackoverflow.com/a/52558172/6942134 – SergiyKolesnikov Apr 01 '21 at 08:28
  • @SergiyKolesnikov this question deals with sequential task running. I cannot mark it as resolving to my question since it would be confusing to people. It's true that second answer confirms my proposition of answer - which is correct. – Hrvoje Apr 01 '21 at 08:41

1 Answers1

0

My initial proposition is correct, so you can run tasks in parallel while all tasks are using certain portion of the list as input like this:

imports...

mylist = [item0, item2,...,item99]
n=5 # Number of parallel tasks, this is also number of splits that we are doing on list. 


def myfunction(sub_list):
    """This is a function that will run within the DAG execution"""
    """Procesing list elements"""


# Generate some dummy initial task
d1 = DummyOperator(task_id='kick_off_dag')

# Generate 5 tasks
for i in range(1,len(mylist), n):
    five_parallelTasks = PythonOperator(
        task_id='myfunction_' + str(i),
        python_callable=myfunction,
        op_kwargs={'sub_list': mylist[i:i + n]},
        dag=dag,
    )

 
d1 >> five_parallelTasks

Where range(1,len(lst), n) is iterating over list by every n-th element. This allows us to define input in each task as mylist[i:i + n]

Calder White
  • 1,287
  • 9
  • 21
Hrvoje
  • 13,566
  • 7
  • 90
  • 104
  • Should `d1 >> five_parallelTasks` be within the for? Or am I misunderstanding how this works? – Kevin Woodward Mar 31 '22 at 17:24
  • @KevinWoodward some education about creating DAGs is needed: https://marclamberti.com/blog/airflow-dag-creating-your-first-dag-in-5-minutes/ – Hrvoje Apr 01 '22 at 06:52
  • This reassigns `five_parallelTasks` within the loop without using it. It's also not being added to a list. Your final line will simply add the dependency between `d1` and the task from the result of the last iteration of your loop. – Kevin Woodward Apr 04 '22 at 23:10
  • @KevinWoodward not really, my solution is similar to this one: https://stackoverflow.com/a/67623392/2119941 put it in practice and see if it works. – Hrvoje Apr 05 '22 at 07:27