1

I'm trying to run a bunch of simulations in Python, so I tried implementing it with multiprocessing.

import numpy as np
import matplotlib.pyplot as plt
import multiprocessing as mp
import psutil

from Functions import hist, exp_fit, exponential

N = 100000  # Number of observations
tau = 13362.525  # decay rate to simulate
iterations = 1  # Number of iterations for each process
bin_size = 1*1e9 # in nanoseconds

def spawn(queue):
    results = []
    procs = list()
    n_cpus = psutil.cpu_count()
    for cpu in range(n_cpus):
        affinity = [cpu]
        d = dict(affinity=affinity)
        p = mp.Process(target=run_child, args=[queue], kwargs=d)
        p.start()
        procs.append(p)
    for p in procs:
        results.append(queue.get)
        p.join()
        print('joined')
    return results

def run_child(queue, affinity):
    proc = psutil.Process()  # get self pid
    proc.cpu_affinity(affinity)
    print(affinity)
    np.random.seed()
    for i in range(iterations):
        time = np.sort(-np.log(np.random.uniform(size=N)) * tau) * 1e9
        n, bins = hist(time, bin_size)
        fit = exp_fit(n, bins, silent=True)
        queue.put(fit)

if __name__ == '__main__':
    output = mp.Queue()
    plt.figure()
    results = spawn(output)
    bins = range(1000)
    for fit in results:
        plt.plot(bins, exponential(fit.params, bins), 'k-', alpha=0.1)
    plt.show()

My attempt is heavily inspired by this answer I found while trying to find a solution myself, where the affinity of each process is manually set as numpy apparently changes the default behaviour (it only runs on a single core if this is not done).

I think the code mostly works; each process performs a simulation and fit as intended, but I cannot figure out how to extract the results. As it is now, the queue.put(fit) in the run_child method seems to cause the program to halt.

Any ideas as to why this happens, and how to fix it?

eyllanesc
  • 235,170
  • 19
  • 170
  • 241
Kristian
  • 121
  • 6
  • If the `queue.put(fit)` blocks the execution, that means that the queue is waiting to get a free slot to insert the fit variable. It will block if the queue is full. – Pierre-Nicolas Piquin Feb 16 '19 at 15:52
  • 1
    And are sure about the `results.append(queue.get)` ? It might be `results.append(queue.get())` – Pierre-Nicolas Piquin Feb 16 '19 at 15:52
  • That was an obvious mistake - I've tried a few different approaches, and must've missed it the last time around. Correcting it doesn't fix the hangs, though. How do I know if the queue is full? Is there some fixed storage capacity I have to be aware of? – Kristian Feb 16 '19 at 17:01
  • 1
    Weird thing, I tried your code by replacing your `hist` and `exp_fit` by `time.sleep(5)` and everything worked fine. Are you sure it's `queue.put(fit)` that is blocking your code ? – Pierre-Nicolas Piquin Feb 16 '19 at 17:28
  • And a queue can have a maximum size if you set it in the constructor. But, in your case, it should have an infinite size so it won't be the queue that is blocking your code. – Pierre-Nicolas Piquin Feb 16 '19 at 17:30
  • 1
    I found the problem! The queues do not support the fit data type (OptimizeResult), so by extracting only the fitting parameters I needed from the fit, putting them in a list and passing them to the queue worked like a charm. Thank you for the help! – Kristian Feb 16 '19 at 18:18
  • Hi there, welcome to SO! Glad you got it sorted. Please avoid answering questions in the comments, instead use the answer box below. [It's OK to answer your own questions](https://stackoverflow.blog/2011/07/01/its-ok-to-ask-and-answer-your-own-questions/)! – grooveplex Feb 16 '19 at 18:24

1 Answers1

1

The problem was trying to pass an OptimizeResult data type to the queue. Extracting only the necessary data from the fit and passing that instead worked like a charm.

Thanks to Pierre-Nicolas Piquin for helping solve it!

Kristian
  • 121
  • 6