3

I am entirely new to parallel computing, in fact, to numerical methods. I am trying to solve a differential equation using python solve_ivp of the following form:

y''(x) + (a^2 + x^2)y(x) = 0
y(0)=1
y'(0)=0
x=(0,100)

I want to solve for a range of a and write a file as a[i] y[i](80).

The original Equation is quite complicated but essentially the structure is the same as defined above. I have used a for loop and it is taking a lot of time for computation. Searching online I came across this beautiful website and found this question and related answer that preciously may solve the problem I am facing.

I tried the original code provided in the solution; however, the output produced is not properly sorted. I mean, the second column is not in proper order.

q1    a1    Y1
q1    a2    Y3
q1    a4    Y4
q1    a3    Y3
q1    a5    Y5
...

I have even tried with one loop with one parameter, but the same issue still remains. Below is my code with the same multiprocessing method but with solve_ivp

import numpy as np
import scipy.integrate
import multiprocessing as mp
from scipy.integrate import solve_ivp


def fun(t, y):
    # replace this function with whatever function you want to work with
    #    (this one is the example function from the scipy docs for odeint)
    theta, omega = y
    dydt = [omega, -a*omega - q*np.sin(theta)]
    return dydt

#definitions of work thread and write thread functions
tspan = np.linspace(0, 10, 201)


def run_thread(input_queue, output_queue):
    # run threads will pull tasks from the input_queue, push results into output_queue
    while True:
        try:
            queueitem = input_queue.get(block = False)
            if len(queueitem) == 3:
                a, q, t = queueitem
                sol = solve_ivp(fun, [tspan[0], tspan[-1]], [1, 0], method='RK45', t_eval=tspan)
                F = 1 + sol.y[0].T[157]
                output_queue.put((q, a, F))
        except Exception as e:
            print(str(e))
            print("Queue exhausted, terminating")
            break

def write_thread(queue):    
    # write thread will pull results from output_queue, write them to outputfile.txt
    f1 = open("outputfile.txt", "w")
    while True:
        try:
            queueitem = queue.get(block = False)
            if queueitem[0] == "TERMINATE":
                f1.close()
                break
            else:
                q, a, F = queueitem                
                print("{}  {} {} \n".format(q, a, F))            
                f1.write("{}  {} {} \n".format(q, a, F))            
        except:
            # necessary since it will throw an error whenever output_queue is empty
            pass

# define time point sequence            
t = np.linspace(0, 10, 201)

# prepare input and output Queues
mpM = mp.Manager()
input_queue = mpM.Queue()
output_queue = mpM.Queue()

# prepare tasks, collect them in input_queue
for q in np.linspace(0.0, 4.0, 100):
    for a in np.linspace(-2.0, 7.0, 100):
        # Your computations as commented here will now happen in run_threads as defined above and created below
        # print('Solving for q = {}, a = {}'.format(q,a))
        # sol1 = scipy.integrate.odeint(fun, [1, 0], t, args=( a, q))[..., 0]
        # print(t[157])
        # F = 1 + sol1[157]    
        input_tupel = (a, q, t)
        input_queue.put(input_tupel)

# create threads
thread_number = mp.cpu_count()
procs_list = [mp.Process(target = run_thread , args = (input_queue, output_queue)) for i in range(thread_number)]         
write_proc = mp.Process(target = write_thread, args = (output_queue,))

# start threads
for proc in procs_list:
    proc.start()
write_proc.start()

# wait for run_threads to finish
for proc in procs_list:
    proc.join()

# terminate write_thread
output_queue.put(("TERMINATE",))
write_proc.join()

Kindly let me know what is wrong in the multiprocessing so that I can learn a bit about multiprocessing in python in the process. Also, I would much appreciate if anyone let me know about the most elegant/efficient way(s) for handling such a computation in python. Thanks

new_kid
  • 76
  • 4
  • Please tell more detail – Minh-Thanh Hoang Jan 14 '20 at 06:21
  • 2
    @Minh-ThanhHoang, in the outputfile.txt file, the data is not written in proper order. The output is supposed to be written in ascending order in (q,a) but sometimes that is not maintained if I use multiple cores. – new_kid Jan 14 '20 at 06:29
  • Why would you *expect* concurrent processes to preserve the order of the data? – Davis Herring Jan 14 '20 at 07:49
  • @DavisHerring, I don't expect it to preserve the order, in fact, to be absolutely honest I can't say I have completely understood what is going on in the multiprocessing. This is my first-time encounter with such processes. However, I want it so that I don't need any post-processing before plotting the data. – new_kid Jan 14 '20 at 08:09
  • @new_kid: Is the output so large that you can’t just buffer and sort it? – Davis Herring Jan 14 '20 at 08:12
  • @DavisHerring, I will be generating many large outputs, thus it will be inconvenient to post-process each of them every time. I have seen some parallel codes written in C++ where such an issue has been properly addressed. Then why can't they be sorted in python! – new_kid Jan 14 '20 at 08:52

1 Answers1

1

What you want is something of an online sort. In this case, you know the order in which you want to present results (i.e., the input order), so just accumulate the outputs in a priority queue and pop elements from it when they match the next key you’re expecting.

A trivial example without showing the parallelism:

import heapq
def sort_follow(pairs,ref):
  """
  Sort (a prefix of) pairs so as to have its first components
  be the elements of (sorted) ref in order.
  Uses memory proportional to the disorder in pairs.
  """
  heap=[]
  pairs=iter(pairs)
  for r in ref:
    while not heap or heap[0][0]!=r:
      heapq.heappush(heap,next(pairs))
    yield heapq.heappop(heap)

The virtue of this approach—the reduced memory footprint—is probably irrelevant for small results of just a few floats, but it’s easy enough to apply.

Davis Herring
  • 36,443
  • 4
  • 48
  • 76
  • Thank you for your answer. Unfortunately, at the level of my expertise, I may not be able to implement it soon. It would be very helpful if you kindly provide a working example(even with one for loop) which I can test. – new_kid Jan 15 '20 at 05:30