4

I want to execute f1 and f2 at the same time. but the following code doesn't work!

from multiprocessing import Pool

def f1(x):
return x*x

def f2(x):
return x^2

if __name__ == '__main__':

    x1=10
    x2=20
    p= Pool(2)
    out=(p.map([f1, f2], [x1, x2]))

y1=out[0]
y2=out[1]
Mohammad
  • 57
  • 1
  • 5
  • You cannot run multiple threads at the same exact time if this is what you want, starting a thread needs some time by itself. There will always be a time difference, so why not start these threads separately one after another? See [this docs](https://docs.python.org/2/library/multiprocessing.html) for reference. – Jezor Jul 02 '16 at 12:05
  • May I ask why you want to do that? – itmuckel Jul 02 '16 at 12:11
  • @Micha90 My functions are very time consuming(5 days to run on clusters!), so for saving time, I need to run them in parallel. – Mohammad Jul 02 '16 at 12:16
  • @Jezor If there is gonna be a time difference, that's fine, I only need to run them in parallel! – Mohammad Jul 02 '16 at 12:17
  • Okay, I haven't time for reading the docs, but have you tried `out[0]=p.map(f1, x1)` and `out[1]=p.map(f2, x2)` ? Maybe the library can't handle multiple functions? Just a guess... – itmuckel Jul 02 '16 at 12:46
  • @HighPerformanceMark another function will generate the inputs and the results need to be compared and... – Mohammad Jul 02 '16 at 15:08

2 Answers2

4

I believe you'd like to use threading.Thread and shared queue in your code.

from queue import Queue
from threading import Thread
import time

def f1(q, x):
    # Sleep function added to compare execution times.
    time.sleep(5)
    # Instead of returning the result we put it in shared queue.
    q.put(x * 2)

def f2(q, x):
    time.sleep(5)
    q.put(x ^ 2)

if __name__ == '__main__':
    x1 = 10
    x2 = 20
    result_queue = Queue()

    # We create two threads and pass shared queue to both of them.
    t1 = Thread(target=f1, args=(result_queue, x1))
    t2 = Thread(target=f2, args=(result_queue, x2))

    # Starting threads...
    print("Start: %s" % time.ctime())
    t1.start()
    t2.start()

    # Waiting for threads to finish execution...
    t1.join()
    t2.join()
    print("End:   %s" % time.ctime())

    # After threads are done, we can read results from the queue.
    while not result_queue.empty():
        result = result_queue.get()
        print(result)

Code above should print output similar to:

Start: Sat Jul  2 20:50:50 2016
End:   Sat Jul  2 20:50:55 2016
20
22

As you can see, even though both functions wait 5 seconds to yield their results, they do it in parallel so overall execution time is 5 seconds.

If you care about what function put what result in your queue, I can see two solutions that will allow to determine that. You can either create multiple queues or wrap your results in a tuple.

def f1(q, x):
    time.sleep(5)
    # Tuple containing function information.
    q.put((f1, x * 2))

And for further simplification (especially when you have many functions to deal with) you can decorate your functions (to avoid repeated code and to allow function calls without queue):

def wrap_result(func):
    def wrapper(*args):
        # Assuming that shared queue is always the last argument.
        q = args[len(args) - 1]
        # We use it to store the results only if it was provided.
        if isinstance(q, Queue):
            function_result = func(*args[:-1])
            q.put((func, function_result))
        else:
            function_result = func(*args)
        return function_result

    return wrapper

@wrap_result
def f1(x):
    time.sleep(5)
    return x * 2

Note that my decorator was written in a rush and its implementation might need improvements (in case your functions accept kwargs, for instance). If you decide to use it, you'll have to pass your arguments in reverse order: t1 = threading.Thread(target=f1, args=(x1, result_queue)).

A little friendly advice.

"Following code doesn't work" says nothing about the problem. Is it raising an exception? Is it giving unexpected results?

It's important to read error messages. Even more important - to study their meaning. Code that you have provided raises a TypeError with pretty obvious message:

File ".../stack.py", line 16, in <module> out = (p.map([f1, f2], [x1, x2]))

TypeError: 'list' object is not callable

That means first argument of Pool().map() have to be a callable object, a function for instance. Let's see the docs of that method.

Apply func to each element in iterable, collecting the results in a list that is returned.

It clearly doesn't allow a list of functions to be passed as it's argument.

Here you can read more about Pool().map() method.

Community
  • 1
  • 1
Jezor
  • 3,253
  • 2
  • 19
  • 43
  • Thank you so much for the reply. It was very helpful. I'm wondering if f1 takes longer to calculate than f2, then the first line in the 'result' will be for f2? I tested that for a simple function and the results were corresponding with the functions(f1 and f2). Should I implement your second comment in the code as well? – Mohammad Jul 03 '16 at 11:19
  • Yes, values will be inserted into queue ordered by function execution time, nothing counter intuitive here (functions are working in parallel, the first to finish execution is the first to yield the result). It all depends on what is your intent, if you need to know which result corresponds with which function then yes, you have to make results distinctive. – Jezor Jul 03 '16 at 12:28
  • Thanks for your 'friendly advice', I just saw that:) I knew that the code I provided doesn't work and I just wrote it in order to say what I want to do with my code...Your first comment for calling different functions at the same time was really helpful, but for solving aforementioned problem, I actually didn't get what to do exactly! When I added the second comment to the code, it gives me the following error: NameError: "global name 'wrapper' is not defined" .....would you please explain it a bit more on how I should implement the rest of the code...Thanks in advance. – Mohammad Jul 03 '16 at 23:51
  • Well, if the minimal solution works for you, there's no point to add additional layer of pythonish obfuscation to your code. (; You can read more about decorators which I used [here](http://simeonfranklin.com/blog/2012/jul/1/python-decorators-in-12-steps/). Are you sure that you didn't miss any line from the code I wrote in your script? – Jezor Jul 04 '16 at 00:03
  • I'm a bit confused (I'm not good in python programing...), For solving my problem you said that: "You can either create multiple queues or wrap your results in a tuple", It means that I should add both of your second and third comment to the code? or i should add them before: while not result_queue.empty(): result = result_queue.get() print(result) this line in the first comment and it will do the job? Thanks – Mohammad Jul 04 '16 at 10:21
  • 1
    Thanks Jezor, I've managed to fix that :) Actually, I put a number in the output of each function and then I sort out the results of the functions based on their numbers! – Mohammad Jul 04 '16 at 10:51
  • @Mohammad glad to hear that! If you are using plain integers, take a look at [python's enumeration](https://docs.python.org/3/library/enum.html). It might improve your code readability. Also, if my answer was helpful, consider giving an upvote and accepting it. (; – Jezor Jul 04 '16 at 11:02
  • 1
    @ Jezor Sure. But as I'm new to this website, I cannot vote now :( I gave an upvote and I think it will apply in the future once I get more points :) – Mohammad Jul 04 '16 at 11:14
2

I want to execute f1 and f2 at the same time. but the following code doesn't work! ...

out=(p.map([f1, f2], [x1, x2]))

The minimal change to your code is to replace the p.map() call with:

r1 = p.apply_async(f1, [x1])
out2 = f2(x2)
out1 = r1.get()

Though if all you want is to run two function calls concurrently then you don't need the Pool() here, you could just start a Thread/Process manually and use Pipe/Queue to get the result:

#!/usr/bin/env python
from multiprocessing import Process, Pipe

def another_process(f, args, conn):
    conn.send(f(*args))
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=another_process, args=(f1, [x1], child_conn))
    p.start()
    out2 = f2(x2)
    out1 = parent_conn.recv()
    p.join()
jfs
  • 399,953
  • 195
  • 994
  • 1,670