7

I am trying to run a function in parallel for multiple files and want all of them to terminate before a point.

For Example: There is a loop

def main():
  for item in list:
     function_x(item)

  function_y(list)

Now what I want is that this function_x should run in parallel for all items. But this function should be executed for all items before my function_y is called.

I am planning to use celery for this. but can not understand how to do this.

Pe Dro
  • 2,651
  • 3
  • 24
  • 44
Gaurav Kumar Singh
  • 582
  • 2
  • 5
  • 14

5 Answers5

14

Here is my final test code.

All I needed to do is use multiprocessing library.

from multiprocessing import Process
from time import sleep

Pros = []

def function_x(i):
    for j in range(0,5):
        sleep(3)
        print i

def function_y():
    print "done"

def main():
  for i in range(0,3):
     print "Thread Started"
     p = Process(target=function_x, args=(i,))
     Pros.append(p)
     p.start()

  # block until all the threads finish (i.e. block until all function_x calls finish)    
  for t in Pros:
     t.join()

  function_y()
Prayag Gordy
  • 667
  • 7
  • 18
Gaurav Kumar Singh
  • 582
  • 2
  • 5
  • 14
  • This is giving me an error below. RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. – Sreevalsa E Mar 06 '20 at 09:53
5

you can use threads for this. thread.join is the function you need, this function block until the thread is finished.
you can do this:

import threading
threads = []
def main():
  for item in list:
     t = threading.Thread(target=function_x, args=(item,))
     threads.append(t)
     t.start()

  # block until all the threads finish (i.e. until all function_a functions finish)    
  for t in threads:
     t.join()

  function_y(list)
Elisha
  • 4,811
  • 4
  • 30
  • 46
  • Thanks Elisha for giving me an approach. But I wanted to run these process in parallel and two threads can not work on same code simultaneously. But I found the solution, I just need to use multiprocessing instead of threading and most of things are same. https://docs.python.org/2/library/multiprocessing.html Read this for better understanding. – Gaurav Kumar Singh Sep 18 '14 at 14:08
  • threads runs simultaneously indeed. this is not the difference between threads and processes. – Elisha Sep 18 '14 at 14:22
4

You can do this elegantly with Ray, which is a library for writing parallel and distributed Python.

Simply declare the function_x with @ray.remote, and then it can be executed in parallel by invoking it with function_x.remote and the results can be retrieved with ray.get.

import ray
import time

ray.init()

@ray.remote
def function_x(item):
    time.sleep(1)
    return item

def function_y(list):
    pass

list = [1, 2, 3, 4]

# Process the items in parallel.
results = ray.get([function_x.remote(item) for item in list])

function_y(list)

View the Ray documentation.

Robert Nishihara
  • 3,276
  • 16
  • 17
1

Here is the documentation for celery groups, which is what I think you want. Use AsyncResult.get() instead of AsyncResult.ready() to block.

joshua
  • 2,509
  • 17
  • 19
1
#!/bin/env python

import concurrent.futures

def function_x(item):
    return item * item


def function_y(lst):
    return [x * x for x in lst]


a_list = range(10)


if __name__ == '__main__':

    with concurrent.futures.ThreadPoolExecutor(10) as tp:

        future_to_function_x = {
            tp.submit(function_x, item): item
            for item in a_list
        }


    results = {}

    for future in concurrent.futures.as_completed(future_to_function_x):

        item = future_to_function_x[future]

        try:
            res = future.result()
        except Exception as e:
            print('Exception when processing item "%s": %s' % (item, e))
        else:
            results[item] = res


    print('results:', results)

    after = function_y(results.values())

    print('after:', after)

Output:

results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
after: [0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]
DevLounge
  • 8,313
  • 3
  • 31
  • 44