I am looking for a simple example of python multiprocessing
.
I am trying to figure out workable example of python multiprocessing
. I have found an example on breaking large numbers into primes. That worked because there was little input (one large number per core) and lot of computing (breaking the numbers into primes).
However, my interest is different - I have lot of input data on which I perform simple calculations. I wonder if there is a simple way to modify the below code so that multicores really beats single core. I am running python 3.6 on Win10 machine with 4 physical cores and 16 GB RAM.
Here comes my sample code.
import numpy as np
import multiprocessing as mp
import timeit
# comment the following line to get version without queue
queue = mp.Queue()
cores_no = 4
def npv_zcb(bnd_info, cores_no):
bnds_no = len(bnd_info)
npvs = []
for bnd_idx in range(bnds_no):
nom = bnd_info[bnd_idx][0]
mat = bnd_info[bnd_idx][1]
yld = bnd_info[bnd_idx][2]
npvs.append(nom / ((1 + yld) ** mat))
if cores_no == 1:
return npvs
# comment the following two lines to get version without queue
else:
queue.put(npvs)
# generate random attributes of zero coupon bonds
print('Generating random zero coupon bonds...')
bnds_no = 100
bnd_info = np.zeros([bnds_no, 3])
bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
bnd_info = bnd_info.tolist()
# single core
print('Running single core...')
start = timeit.default_timer()
npvs = npv_zcb(bnd_info, 1)
print(' elapsed time: ', timeit.default_timer() - start, ' seconds')
# multiprocessing
print('Running multiprocessing...')
print(' ', cores_no, ' core(s)...')
start = timeit.default_timer()
processes = []
idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
idx.append(bnds_no + 1)
for core_idx in range(cores_no):
input_data = bnd_info[idx[core_idx]: idx[core_idx + 1]]
process = mp.Process(target=npv_zcb,
args=(input_data, cores_no))
processes.append(process)
process.start()
for process_aux in processes:
process_aux.join()
# comment the following three lines to get version without queue
mylist = []
while not queue.empty():
mylist.append(queue.get())
print(' elapsed time: ', timeit.default_timer() - start, ' seconds')
I would be very grateful if anyone could advice me how to modify the code so that multiple core run beats single core run. I have also noticed that increasing variable bnds_no
to 1,000 leads to BrokenPipeError
. One would expect that increasing amount of input would lead to longer computational time rather than an error... What is wrong here?