0

I'm trying to parallelize some code that uses partial functions to generate random numbers for a simulation I'm working on. With the following code:

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example(func1, func2):
    sleep(1)
    [a, b] = [func1(), func2()]
    return (f"arg #1 is {round(a,2)}, arg #2 is {round(b,2)} at {datetime.now().time()}")

rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...

print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(example, argsToRun)
print("\n".join(results))

I get the following output...

running with a for loop...
arg #1 is 134.5, arg #2 is 232.45 at 11:58:17.025493
arg #1 is 213.38, arg #2 is 306.7 at 11:58:18.027038
arg #1 is 107.3, arg #2 is 347.19 at 11:58:19.028476

Running with itertools.starmap...
arg #1 is 167.7, arg #2 is 247.96 at 11:58:20.030238
arg #1 is 235.97, arg #2 is 318.02 at 11:58:21.031543
arg #1 is 140.41, arg #2 is 387.51 at 11:58:22.032727

Running with pathos.mp.starmap...
arg #1 is 120.24, arg #2 is 208.23 at 11:58:23.100251  
arg #1 is 220.24, arg #2 is 308.23 at 11:58:23.112206  
arg #1 is 120.24, arg #2 is 308.23 at 11:58:23.126050  

The problem is that when I parallelize this, the random functions are NOT evaluated differently each time. They're only being evaluated once (or the result is somehow getting reused over and over...) if you look at the last block, the values from the random functions passed in are not changing. I put the timestamps in there to convince myself that the last block actually WAS being executed in parallel.

I'm sure it has something to do with when and how the tuple-parameters for the function calls are evaluated, but at this point I'm lost.

The very high level goal is to be able to build a (very large) list of parameters to pass into a simPy environment, and have the pool execute on them in parallel. But until I can figure out how to get the randomness to work, I'm stuck doing this at 1/32 of the speed I need.

ljwobker
  • 832
  • 2
  • 10
  • 20
  • Having random numbers generated from one generator across multiple processes at the same time is probably not a good idea. Maybe have a producer pre-populate a queue of random values that the processes consume from? – Carcigenicate Nov 28 '19 at 17:23
  • The point is to have different random distributions for different runs. I don't follow why this would be a bad idea - I need variance in the numbers, not the same numbers each time. – ljwobker Nov 28 '19 at 17:41
  • 1
    Because typically, unless and object is meant to be multiprocess/threading safe, manipulating an object from multiple processes/threads at the same time can mess with the internal state of the object and cause weird behavior; like it giving the same output twice. – Carcigenicate Nov 28 '19 at 17:45
  • 1
    See [here](https://stackoverflow.com/questions/10021882/make-the-random-module-thread-safe-in-python) for how this can be resolved. Try creating multiple generators instead of using the global one for everything. – Carcigenicate Nov 28 '19 at 17:47
  • Sorry - I don't understand how/why this mechanism would manipulate an object from multiple processes at the same time. The function is passed down through to the ultimate location it gets called, and then should be evaluated there independently, right? I'm sure there's something I'm missing but I'm not seeing it yet. (Sorry!) – ljwobker Nov 29 '19 at 15:44
  • Every time `func1` and `func2` are called inside of `example`, you're having the global random generator produce a value, which changes the internal state of the generator. `example` is passed to `starmap`, and `starmap` has the potential to call `example` multiple times at the same time. If `example` is called multiple times at the same time, you'll potentially have multiple calls to `func1` and `func2` happening at the same time, which means that the random number generator is being manipulated by multiple processes at the same time. – Carcigenicate Nov 29 '19 at 16:01
  • And just to clarify, in my above comment, by `starmap`, I'm referring to `pool.starmap`, not `itertools`' `starmap`. The `itertools` version doesn't run in parallel, so it's fine. – Carcigenicate Nov 29 '19 at 16:07
  • OK, I found https://stackoverflow.com/questions/10021882/make-the-random-module-thread-safe-in-python, which looks helpful -- but it's not clear to me where I need to provide the separate instances of the RNG. Do I have to actually pass the local_random instance to each function call? (I think not...) – ljwobker Nov 29 '19 at 16:37
  • It may be easier to pre-generate the random numbers, store them in a multi-process-safe queue (pathos should have one), and have `example` pop numbers from the queue as needed. – Carcigenicate Nov 29 '19 at 16:39
  • Or maybe asked a different way: what's the right way to get a thread-safe version of random into each instance of example called by starmap? Is this possible without changing the signature of example()? And the pregeneration sort of makes sense, but in my use case I need to use a bunch of different random.XXX distributions in the actual program. – ljwobker Nov 29 '19 at 16:39

1 Answers1

0

So I got this working, but I'm pretty sure it's still a total hack. I ended up taking just the parameters for the functions and passing them to a method that uses a local instance of random...

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example( func1, func2):
    sleep(0.5)
    [a, b] = [func1(), func2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")

def threadSafe(func1, func2):
    sleep(0.5)
    localRandom = random.Random()
    meth1 = getattr(localRandom, func1.func.__name__, func1.args)
    meth2 = getattr(localRandom, func2.func.__name__, func2.args)
    local_f1 = functools.partial(meth1, *func1.args)
    local_f2 = functools.partial(meth2, *func2.args)
    [a, b] = [local_f1(), local_f2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")


rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...


print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning threadSafe with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(threadSafe, argsToRun)
print("\n".join(results))
ljwobker
  • 832
  • 2
  • 10
  • 20