I have a rather complex recursive function with many parameters (Obara-Saika-Scheme in case anyone is wondering), that I would like to evualate more efficiently.
As a first step I applied @functools.lru_cache
. As a second step, I now want to use multiprocessing.Pool
to asynchronously evaluate a long list of input parameters.
Adapting the second example from the functools Python docs and adding a pool of workers I have:
from multiprocessing import Pool
from functools import lru_cache
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n)
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with Pool(processes=4) as pool:
for i in range(10):
res = pool.apply_async(fibonacci, (i,))
print(res.get())
print(fibonacci.cache_info())
Question 1
How do I get the cache to be shared over the different workers. Another question (How to share a cache?) is asking a similar thing, but I could not get it working. Here are my 2 failed approaches to this.
Using multiprocessing.Pool
:
from multiprocessing import Pool
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
res = []
with Pool(processes=4) as pool:
# submit first task
res.append(pool.apply_async(fibonacci, (5,)).get())
# give fibonacci() some time to fill its cache
time.sleep(1)
# submit second task
res.append(pool.apply_async(fibonacci, (3,)).get())
print(res)
Using concurrent.futures
:
import concurrent.futures
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
@lru_cache(maxsize=10)
def fib_async(n):
print('calculating fib_async(%i)' %n)
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
res = []
# submit first task
res.append(executor.submit(fib_async, 5))
# give fib_async() some time to fill its cache
time.sleep(1)
# submit second task
res.append(executor.submit(fib_async, 3))
res = [e.result() for e in res]
print(res)
Both produce basically the same output, showing that the second task recalculates fibonacci(2)
, although the first task already had to calculate it. How do I get the cache shared?
This should speed things up a little, but still has a problem if duplicate calls are badly timed: a call currently evaluated by worker1 is not yet cached and worker2 might start to evaluate the same thing. Which brings me to:
Question 2
Calculating Fibonacci numbers is rather linear in its recursion, i.e. there is only one parameter being decremented. My function is more complex and I could use something that manages not only what input parameters have already been calculated, but keeps track of what is currently being calculated.
To be clear: I want to make many parallel calls to the recursive function which will spawn many new calls to the recursive function.
A tricky thing might be to avoid assigning one call directly to a worker, since this would cause deadlocks when the recursion depths exceeds the number of workers.
Is there already such a thing I could use? Or do I need to build something on my own? I stumpled upon multiprocessing.managers
and concurrent.futures.ProcessPoolExecutor
which might be helpful. But I could use some help to get started.