2

I'm trying to perform numerical integration on a large array and the computation takes a very long time. I tried to speed up my code by using numba and the jit decorator, but numpy.trapz isn't supported.

My new idea would be to create n-many threads to run the calculations in parallel, but I was wondering how I could do this or if it was even feasible?

Referencing Below Code

Can I make sz[2] many threads to run at the same time that calls ZO_SteadState to calculate values?

    for i in range(sz[1]):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)

The calculation in question.

@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
    g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
            exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2))  # stationary point heat source.

    y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
              * (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g

    x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)

    # integral transform of heat source. integral over y-axis
    gbarbar = np.trapz(x, rprime, axis=1)

    PHI2 = np.meshgrid(phiprime, s)[0]

    sz2 = PHI2.shape
    f = h2 * 37 * Array_Ones((sz2[0], sz[1]))  # boundary condition.

    fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4)  # integrate over y

    A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
                1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar

    return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)
NoviceCoder
  • 424
  • 1
  • 4
  • 20
  • Yes, you could multiprocess this pretty easily with `multiprocessing.Pool`. You would want subprocesses though, not threads. – CJR Oct 21 '18 at 19:49
  • @CJ59 I think threads are quite OK here, since just about all of the caluclations are done within the `numpy` library, which allows for multiple threads to run in parallel. And threads mean less data shuffling, so it might be that it is actually quicker with threads. – JohanL Oct 21 '18 at 19:51
  • Maybe, but if you're getting maximum advantage out of the openmp parts of numpy then threading won't help much. If you're not, then threading won't help (but subprocessing will). – CJR Oct 21 '18 at 20:02
  • @CJ59 Why would you not get the OpenMP advantages when using threading? There should be no intrinsic problem combining them as far as I can tell. – JohanL Oct 21 '18 at 21:50

2 Answers2

1

Another concept implementation, with processes spawning processes (EDIT: jit tested):

import numpy as np

# better pickling
import pathos 
from contextlib import closing


from numba import jit

#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)




# matrix dimensions
x = 100 # i
y = 500 # j

NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned

SS = np.zeros([x, y], dtype=float)

@jit
def foo(i):
    return (i*i + 1)
@jit
def bar(phii, j):
    return phii*(j+1)

# The code which is implemented down here:
'''
for i in range(x):
    phii = foo(i)
    for j in range(y):
        SS[i, j] = bar(phii, j)
'''

# Threaded version:
# queue is in global scope


def outer_loop(i):

    phii = foo(i)

    # i is in process scope
    def inner_loop(j):
        result = bar(phii,j)
        # the data is coordinates and result
        return (i, j, result)


    with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
        res = list(pool.imap(inner_loop, range(y)))
    return res

with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:    
    results = list(pool.imap(outer_loop, range(x)))

result_list = []
for r in results:
    result_list += r


# read results from queue
for res in result_list:
    if res:
        i, j, val = res
        SS[i,j] = val


# check that all cells filled
print(np.count_nonzero(SS)) # 100*500

EDIT: Explanation.

The reason of all the complications in this code is that I wanted to do more parallelization than OP asked for. If only inner loop is parallelized, then the outer loop remains, so for each iteration of outer loop new process pool is created and computations for inner loop are performed. As long, as it seemed for me, that formula does not depend on other iterations of outer loop, I decided to parallelize everything: now computations for outer loop are assigned to processes from the pool, after that each of the "outer-loop" processes creates its own new pool, and additional processes are spawned to perform computations for inner loop.

I might be wrong though and outer loop must not be parallelized; In this case you can leave only inner process pool.

Using process pools might be not optimal solutions though as time will be consumed on creation and destruction of pools. More efficient (but requiring mode handwork) solution will be to instantiate N processes once and for all, and then feed data into them and receive results using multiprocessing Queue(). So you should test first whether this multiprocessing solution gives you enough speedup (This will happen if time on constructing and destructing pools is small in comparison to Z0_SteadyState run).

The next complication, is that artificial no-daemon pool. Daemon process is used to gracefully stop application: when main program exits, daemon processes are terminated silently. However, daemon process can not spawn child processes. Here in your example you need to wait until each process ends to retrieve data, so I made them non-daemon to allow spawning child processes to compute inner loop.

Data exchange: I suppose that the amount of data which is needed to fill matrix and time to do it is small in comparison to actual computations. So I use pools and pool.imap function (which is a bit faster than .map(). You can also try .imap_unordered(), however in your case it should not make significant difference). Thus inner pool waits until all the results are computed and returns them as list. The outer pool thus returns list of lists which must be concatenated. Then the matrix is reconstructed from these results in single fast loop.

Notice with closing() thing: it closes pool automatically after things under this statement are finished, avoiding memory consumption by zombie processes.

Also, you might notice that I weirdly defined one function inside another, and inside processes I have access to some variables which have not been passed there: i, phii. This happens because processes have access to the global scope from which they were launched with copy-on-change policy (default fork mode). This does not involve pickling and is fast.

The last comment is about using pathos library instead of standard multiprocessing, concurrent.futures, subprocess, etc. The reason is that pathos has better pickling library used, so it can serialize functions which standard libraries can't (for example, lambda functions). I don't know about your function, so I used more powerful tool to avoid further problems.

And the very last thing: multiprocessing vs threading. You can change pathos processing pool to, say, standard ThreadPoolExecutor from concurrent.futures, as I did on the beginning when I just started that code. But, during execution, on my system CPU is loaded only on 100% (i.e. one core is utilized, it appears like all 8 cores are loaded at 15-20%). I am not that skilled to understand differences between threads and processes, but it seems for me, that processes allow to utilize all cores (100% each, 800% total).

Slowpoke
  • 1,069
  • 1
  • 13
  • 37
  • Sorry, I'm unfamiliar with the multiprocessing library. Can you explain a little of how you've proposed solution works in regards to the multiprocessing aspects? – NoviceCoder Oct 21 '18 at 20:38
  • I've added some explanations in the update of my post. – Slowpoke Oct 22 '18 at 10:45
0

This is the overall idea that I'd probably do. There's not enough context to give you a more reliable example. You'll have to set all your variables into the class.

import multiprocessing

pool = multiprocessing.Pool(processes=12)
runner = mp_Z0(variable=variable, variable2=variable2)

for i, j, v in pool.imap(runner.run, range(sz[1]):
    SS[0, i, j] = v


class mp_Z0:

    def __init__(self, **kwargs):
        for k, v in kwargs:
            setattr(self, k, v)


    def run(self, i):
        phii = self.phi[i]
        for j in range(self.sz[2]):
            s = self.tau[0, i, j, :].reshape(1, self.n4)
            [R3, PHI3, S3] = meshgrid(self.rprime, self.phiprime, s)
            BCoeff = Bessel0(self.bm * R3)

            return (i, j, ZO_SteadyState(self.alpha, self.b, self.bm, BCoeff, Bessel_Denom, self.k2, self.maxt, phii, self.PHI2, PHI3, self.phiprime, R3, self.rprime, self.s, S3, self.T, self.v))

This is an example (assuming everything is in the local namespace) of doing it without classes:

import multiprocessing

pool = multiprocessing.Pool(processes=12)    
def runner_function(i):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            return (i, j, ZO_SteadyState(alpha, b, bm, BCoeff, Bessel_Denom, k2, maxt, phii, PHI2, PHI3,
                                         phiprime, R3, rprime, s, S3, T, v))

for i, j, v in pool.imap(runner_function, range(sz[1]):
    SS[0, i, j] = v   
CJR
  • 3,916
  • 2
  • 10
  • 23
  • Could this be done without using classes? I was able to use Numba and compute some portion of my code with njit decorator, but the self-statements will prevent me using no python mode. – NoviceCoder Oct 21 '18 at 20:13
  • 1
    Yes. This is just the easiest way for me to write it. Any function that takes `i` and returns `(i, j, Z0_SteadyState())` will replace `runner.run` in the example I've given. – CJR Oct 21 '18 at 20:41
  • 1
    Also I'm pretty sure that mp.Pool doesn't play nice with numba so this would be an alternative. – CJR Oct 21 '18 at 20:47
  • I see thank you. This is awesome I'll see if I can implement this. When you saymp.pool doesn't play nice with numba are you suggesting I do this instead? Is it not possible to call jit-ted function in the mp.pool? – NoviceCoder Oct 21 '18 at 20:55
  • To be honest, I don't use numba (most of my math is matrix operations and numpy is already fast at that). My understanding was that numba had it's own built-ins for parallelizing and that it was temperamental if you didn't use them. I don't have the experience to give you good advice about it. – CJR Oct 21 '18 at 22:40
  • Sorry, another question. I have 4 cases essentially order 0 to order 3, each case I want runs a multiprocess to speed up the above code. Is it feasible to run four threads that run multi-process array calculations? – NoviceCoder Oct 22 '18 at 01:25