2

So I have a bunch of functions, that don't depend on each other to do their stuff, and each of them takes quite some time. So i thought i would safe runtime if I could use Multiple Threads. For example:

axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)

All my variables so far are lists (pretty long lists too)

I have to do this for every input file, and this takes hours if there are more than 200... (I excpect about 1000+)

To reduce the runtime I tried to check compute the data as little as possible (especially sanity checks) which helped greatly, but the next improvement would be using one thread for each set of data.

I've tried something like this (oversimplyfied):

from multiprocessing import Pool

def calc_velocity(data, factor):
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))

and:

from multiprocessing import Process


def calc_velocity(data_pack):
    data = []
    factor = []
    data.extend(data_pack[0])
    factor.extend(data_pack[1])
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list


data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    data_pack = []
    data_pack.append(data_axial)
    data_pack.append(factors_axial)
    p = Process(target = calc_velocity, args = data_pack)
    p.start()
    p.join()
    print p

None of these work, and I can't figure out how to make them work.

Lucy The Brazen
  • 135
  • 1
  • 9

3 Answers3

3

when I want to do multiprocessing in python I use threads, the following code should be a example for using threads in python:

from threading import Thread
import time

def time1(a, b):
    print a
    time.sleep(10)
    print time.time(), a
    return b

def time2(c, d):
    print c
    time.sleep(10)
    print time.time(), c
    return d

if __name__ == '__main__':
    # target: the function name (pointer),
    # args: a tuple of the arguments that you want to send to your function
    t1 = Thread(target = time1, args=(1, 2))
    t2 = Thread(target = time2, args=(3, 4))

    # start the functions:
    a = t1.start()
    b = t2.start()
    print a
    print b

as you can see in this code, threads can't return a value, so there are two ways to return a value in threads, one: you can write the output into a file, and then read the file a try\except block, or you can change a global value to what you want to return. If you still want to use multiprocessing, you can find some help here: how to get the return value from a thread in python?

Hope that was helpful.

messy212
  • 64
  • 5
  • Python threads don't enable parallel execution - there's only one thread running at a time and threads would make OP's task actually executing slower due to context switches, GIL tracking and the like. – zwer Jul 12 '17 at 14:41
  • 1
    I know that, the threads run one function at a time but not to the end, threads run some of the function, then some of other function and repeat again and again to the end of the function, if you have few function which are don't depend on each other to do their stuff, so threads it is an easy way to do things, any way, there is a link to a multiprocessing question, which can helpful if you want to return stuff from your functions. anyway, when you run this code the same time is printed, and wait only 10 seconds for both function not 20. – messy212 Jul 12 '17 at 14:53
  • 1
    If you were aware of that, what part of the OP's "_To reduce the runtime..._" quest made you suggest threading as a solution for OP problem? – zwer Jul 12 '17 at 15:08
  • Try to run the code, it take's only 10 seconds and not 20, it does reduce the run-time, I always use it when I need to run two things at once, I used it on server, that serve multiple users at the same time and work. anyway, I also suggested a multiprocessing solution. there are a lot of questions in stackoverflow about running two things at once in python. – messy212 Jul 12 '17 at 16:56
  • 1
    No it doesn't, just because you `time.sleep()` your threads doesn't mean they are running in parallel / at once - `time.sleep()` just causes the context switch. Try adding something that actually uses the CPU (i.e. `while True: a *= b`) instead of `time.sleep()` and see what happens then. Just because it can be used in certain contexts (I/O mainly) to speed up execution doesn't mean that it actually does - read more on [GIL](https://wiki.python.org/moin/GlobalInterpreterLock). – zwer Jul 12 '17 at 17:00
  • Well it worked for me a lot of times, and as said, I did a server code with this and worked just fine. – messy212 Jul 12 '17 at 17:13
  • @zwer Just wanna said that I looked on your answer, and thank you, I learned a lot, I tried your code a few times and understood it. BTW I can't comment on your post yet. – messy212 Jul 12 '17 at 17:41
2

If you don't need the results as soon as they are completed, a simple multiprocessing.Pool.map() is more than enough to separate your task into separate processes to run in parallel, e.g.:

import multiprocessing

def worker(args):  # a worker function invoked for each sub-process
    data, factor = args[0], args[1]  # Pool.map() sends a single argument so unpack them
    return [e * factor[i] for i, e in enumerate(data)]

if __name__ == "__main__":  # important process guard for cross-platform use
    calc_pool = multiprocessing.Pool(processes=3)  # we only need 3 processes
    data = (  # pack our data for multiprocessing.Pool.map() ingestion
        (data_axial, factors_axial),
        (data_radial, factors_radial),
        (data_circ, factors_circ)
    )
    # run our processes and await responses
    axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)

However, the concerning part in your question lies in the hint that you have a large sum of data to pass around - when Python uses multiprocessing it doesn't share its memory, and while at least on systems with fork it can use copy-on-write optimization, passing data between processes always invokes an extremely slow pickle-unpickle routine to pack and send the data.

For that reason, make sure the amount of data you exchange is minimal - for example, if you were loading data_axial and factors_axial from a file, better just send the file path(s) and let the worker() process load/parse the file(s) itself than load the file in your main process and then send over the loaded data.

If you need to frequently (and randomly) access large amounts of (mutable) shared data in your sub-processes, I'd suggest you to use some in-memory database for the task, like Redis.

zwer
  • 24,943
  • 3
  • 48
  • 66
  • Thanks for the answer, I'll look into opening the needed file in the subprocess, because I don't really need it in the main process anyways. – Lucy The Brazen Jul 12 '17 at 15:58
  • Now implemented and running. The fun thing is, it took me about half an hour to rewrite the functions and everything, but it took me half a day of tweaking to actually get better performance... – Lucy The Brazen Jul 13 '17 at 09:53
1

Your first example is almost there. However Pool doesn't take an arg keyword. Additionally, Pool.map() only lets you pass a single argument to a function. To pass multiple arguments, you have to pack them into another structure, like a tuple, as you did in your second example.

This modified version of your first example works.

from multiprocessing import Pool

def calc_velocity(work_args):
    buffer_list = []
    for index, line in enumerate(work_args[0]):
        buffer_list.append(work_args[0][index] * work_args[1][index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    work_args = (data_axial, factors_axial)
    axial_velocity = p.map(calc_velocity, [work_args])

If the calc_velocity function is actually representative of your function, then you could use numpy's multiply function to make it easier (and faster). Your calc_velocity function would just be:

def calc_velocity(work_args):
    return numpy.multiply(work_args[0], work_args[1])
ahota
  • 439
  • 5
  • 16
  • This looks great, I'll totally try it out tomorrow at work. Also sadly my `calc_velocity` isn't that simple, but I think the code is long enough without the whole function. – Lucy The Brazen Jul 12 '17 at 15:54