-1

I am looking for a simple example of python multiprocessing.

I am trying to figure out workable example of python multiprocessing. I have found an example on breaking large numbers into primes. That worked because there was little input (one large number per core) and lot of computing (breaking the numbers into primes).

However, my interest is different - I have lot of input data on which I perform simple calculations. I wonder if there is a simple way to modify the below code so that multicores really beats single core. I am running python 3.6 on Win10 machine with 4 physical cores and 16 GB RAM.

Here comes my sample code.

import numpy as np
import multiprocessing as mp
import timeit

# comment the following line to get version without queue
queue = mp.Queue()
cores_no = 4


def npv_zcb(bnd_info, cores_no):

     bnds_no = len(bnd_info)
     npvs = []

     for bnd_idx in range(bnds_no):

         nom = bnd_info[bnd_idx][0]
         mat = bnd_info[bnd_idx][1]
         yld = bnd_info[bnd_idx][2]

         npvs.append(nom / ((1 + yld) ** mat))

     if cores_no == 1:
         return npvs
     # comment the following two lines to get version without queue
     else:
         queue.put(npvs)

# generate random attributes of zero coupon bonds

print('Generating random zero coupon bonds...')


bnds_no = 100

bnd_info = np.zeros([bnds_no, 3])
bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
bnd_info = bnd_info.tolist()

# single core
print('Running single core...')
start = timeit.default_timer()
npvs = npv_zcb(bnd_info, 1)
print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

# multiprocessing
print('Running multiprocessing...')
print('   ', cores_no, ' core(s)...')
start = timeit.default_timer()

processes = []

idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
idx.append(bnds_no + 1)

for core_idx in range(cores_no):
     input_data = bnd_info[idx[core_idx]: idx[core_idx + 1]]

     process = mp.Process(target=npv_zcb,
                          args=(input_data, cores_no))
     processes.append(process)
     process.start()

for process_aux in processes:
     process_aux.join()

# comment the following three lines to get version without queue
mylist = []
while not queue.empty():
     mylist.append(queue.get())

print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

I would be very grateful if anyone could advice me how to modify the code so that multiple core run beats single core run. I have also noticed that increasing variable bnds_no to 1,000 leads to BrokenPipeError. One would expect that increasing amount of input would lead to longer computational time rather than an error... What is wrong here?

Macky
  • 125
  • 9
  • On my Windows 7 system with Python 3.7.2 the code currently in your question raises a `RuntimeError` because it doesn't have an `if __name__ == '__main__':` to protect its entry point. See the section titled "Safe importing of main module" of the multiprocessing [Programming guidelines](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming). Fixing this problem isn't an answer to your question, but posting code that actually works might help you get a good one... – martineau Feb 07 '19 at 20:16
  • I wouldn't expect `multiprocessing` to speed things up in this case—borne out by the results I obtained after fixing your code—because, paraphrasing an article I recently read: An improvement due to parallel processing only makes sense if tasks are "CPU-bound" where the majority of the task is spent in the CPU in contrast to I/O bound tasks (i.e. tasks processing data from disk) — which is not true of your `npv_zcb()` function. – martineau Feb 07 '19 at 20:49
  • @martineau Do you mean your comment in general or related to python and its implementation of multiprocessing? Few years ago I managed to implement something similar in C++ and there the speed was scaling with number of cores quite nicely... – Macky Feb 07 '19 at 20:56
  • I meant it generally. Multiprocessing _can_ speed things up in any programming language when applied to certain kinds of problems depending on the kind of processing is being done and where most of the time is being spent. In this particular case, the overhead of passing the data between the processes takes a lot more processing than the simple things the `npv_zcb()` does. – martineau Feb 07 '19 at 21:04

6 Answers6

1

This doesn't directly answer your question but if you were using RxPy for reactive Python programming you could check out their small example on multiprocessing: https://github.com/ReactiveX/RxPY/tree/release/v1.6.x#concurrency

Seems a bit easier to manage concurrency with ReactiveX/RxPy than trying to do it manually.

  • 1
    Thx for suggestion. Unfortunately I am working in a corporate with rigid ICT department. It is a complete sci-fi that our ICT install a new package just for me :(. – Macky Feb 07 '19 at 20:01
  • @Macky are they using apt-get to install python packages or just have a whitelist of packages? only suggested it because concurrent coding became way easier for me at least with RxPy (RX.js is pretty popular for JavaScript) –  Feb 07 '19 at 20:27
  • At work we are forced to use Win10. I can only install packages that are approved and signed by our ICT. They have very rigid procedures for everything - just for illustration - HDD upgrade was a 6 month "project" for them... I will try to have look anyhow during the weekend. However, even if it worked I have very little chance I will persuade them to approve the package... – Macky Feb 07 '19 at 21:00
1

The BrokenPipeError is not due to larger input but it is due to race condition which occurres due to the use of queue.empty() and queue.get() in separate steps.

You don't see it with smaller inputs for most the times is because the queue items get processed pretty fast and race condition does not occur but with larger datasets the chances of race condition increases.

Even with smaller inputs try running your script multiple times, maybe 10 15 times and you will see BrokenPipeError occurring.

One solution to this is to pass a sentinel value to the queue which you can use to test if all the data in the queue has been processed.

Try modifying your code to something like this

q = mp.Queue()
 <put the data in the queue>
 q.put(None)


while True:
    data = q.get()
    if data is not None:
        <process the data here >
    else:
        q.put(None)
        return
Rohit
  • 3,659
  • 3
  • 35
  • 57
0

OK, so I removed queue related parts from the code to see if get rid of the BrokenPipeError (above I updated the original code indicating what should be commented out). Unfortunately it did not help.

I tested the code on my personal PC with Linux (Ubuntu 18.10, python 3.6.7). Quite surprisingly the code behaves differently on the two systems. On Linux the version without queue runs without problems; the version with queue runs forever. On Windows there is no difference - I always end up with BrokenPipeError.

PS: In some other post (No multiprocessing print outputs (Spyder)) I found that there might be some problem with multiprocessing when using Spyder editor. I experienced exactly the same problem on Windows machine. So, not all examples in official documentation work as expected...

Macky
  • 125
  • 9
0

This doesn't answer your question—I'm only posting it to illustrate what I was said in comments about when multiprocessing might be able to speed processing up.

In the code below which is based on yours, I've added a REPEAT constant that makes the npv_zcb() do its computations over again that many times to simulate it using the CPU more. Changing this constant's value generally slows-down or speeds-up the single core processing much more than it does the multiprocessing part — in-fact it hardly affects the latter at all.

import numpy as np
import multiprocessing as mp
import timeit


np.random.seed(42)  # Generate same set of random numbers for testing.

REPEAT = 10  # Number of times to repeat computations performed in npv_zcb.


def npv_zcb(bnd_info, queue):

    npvs = []

    for _ in range(REPEAT):  # To simulate more computations.

        for bnd_idx in range(len(bnd_info)):

            nom = bnd_info[bnd_idx][0]
            mat = bnd_info[bnd_idx][1]
            yld = bnd_info[bnd_idx][2]
            v = nom / ((1 + yld) ** mat)

    npvs.append(v)

    if queue:
        queue.put(npvs)
    else:
        return npvs


if __name__ == '__main__':

    print('Generating random zero coupon bonds...')
    print()

    bnds_no = 100
    cores_no = 4

    # generate random attributes of zero coupon bonds

    bnd_info = np.zeros([bnds_no, 3])
    bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
    bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
    bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
    bnd_info = bnd_info.tolist()

    # single core
    print('Running single core...')
    start = timeit.default_timer()
    npvs = npv_zcb(bnd_info, None)
    print('   elapsed time: {:.6f} seconds'.format(timeit.default_timer() - start))

    # multiprocessing
    print()
    print('Running multiprocessing...')
    print('  ', cores_no, ' core(s)...')
    start = timeit.default_timer()

    queue = mp.Queue()
    processes = []

    idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
    idx.append(bnds_no + 1)

    for core_idx in range(cores_no):
        input_data = bnd_info[idx[core_idx]: idx[core_idx + 1]]

        process = mp.Process(target=npv_zcb, args=(input_data, queue))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    mylist = []
    while not queue.empty():
        mylist.extend(queue.get())

    print('   elapsed time: {:.6f} seconds'.format(timeit.default_timer() - start))
martineau
  • 119,623
  • 25
  • 170
  • 301
0

OK - so finally I found an answer. Multiprocessing does not work on Windows. The following code runs fine on Ubuntu (Ubuntu 19.04 & python 3.7) but not on Windows (Win10 & python 3.6). Hope it helps others...

import pandas as pd
import numpy as np
import csv
import multiprocessing as mp
import timeit


def npv_zcb(bnd_file, delimiter=','):
    """
    Michal Mackanic
    06/05/2019 v1.0

    Load bond positions from a .csv file, value the bonds and save results
    back to a .csv file.

    inputs:
        bnd_file: str
            full path to a .csv file with bond positions
        delimiter: str
            delimiter to be used in .csv file
    outputs:
        a .csv file with additional field npv.

    dependencies:

    example:
        npv_zcb('C:\\temp\\bnd_aux.csv', ',')
    """

    # load the input file as a dataframe
    bnd_info = pd.read_csv(bnd_file,
                           sep=delimiter,
                           quoting=2,  # csv.QUOTE_NONNUMERIC
                           doublequote=True,
                           low_memory=False)

    # convert dataframe into list of dictionaries
    bnd_info = bnd_info.to_dict(orient='records')

    # get number of bonds in the file
    bnds_no = len(bnd_info)

    # go bond by bond
    for bnd_idx in range(bnds_no):
        mat = bnd_info[bnd_idx]['maturity']
        nom = bnd_info[bnd_idx]['nominal']
        yld = bnd_info[bnd_idx]['yld']
        bnd_info[bnd_idx]['npv'] = nom / ((1 + yld) ** mat)

    # covert list of dictionaries back to dataframe and save it as .csv file
    bnd_info = pd.DataFrame(bnd_info)
    bnd_info.to_csv(bnd_file,
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    return


def main(cores_no, bnds_no, path, delimiter):

    # generate random attributes of zero coupon bonds
    print('Generating random zero coupon bonds...')
    bnd_info = np.zeros([bnds_no, 3])
    bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
    bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
    bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
    bnd_info = zip(bnd_info[:, 0], bnd_info[:, 1], bnd_info[:, 2])
    bnd_info = [{'maturity': mat,
                 'nominal': nom,
                 'yld': yld} for mat, nom, yld in bnd_info]
    bnd_info = pd.DataFrame(bnd_info)

    # save bond positions into a .csv file
    bnd_info.to_csv(path + 'bnd_aux.csv',
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    # prepare one .csv file per core
    print('Preparing input files...')

    idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
    idx.append(bnds_no + 1)

    for core_idx in range(cores_no):
        # save bond positions into a .csv file
        file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
        bnd_info_aux = bnd_info[idx[core_idx]: idx[core_idx + 1]]
        bnd_info_aux.to_csv(file_name,
                            sep=delimiter,
                            quoting=csv.QUOTE_NONNUMERIC,
                            quotechar='"',
                            index=False)

    # SINGLE CORE
    print('Running single core...')

    start = timeit.default_timer()

    # evaluate bond positions
    npv_zcb(path + 'bnd_aux.csv', delimiter)

    print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

    # MULTIPLE CORES
    if __name__ == '__main__':

        # spread calculation among several cores
        print('Running multiprocessing...')
        print('   ', cores_no, ' core(s)...')

        start = timeit.default_timer()

        processes = []

        # go core by core
        print('        spreading calculation among processes...')
        for core_idx in range(cores_no):
            # run calculations
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            process = mp.Process(target=npv_zcb,
                                 args=(file_name, delimiter))
            processes.append(process)
            process.start()

        # wait till every process is finished
        print('        waiting for all processes to finish...')
        for process in processes:
            process.join()

    print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

main(cores_no=2,
     bnds_no=1000000,
     path='/home/macky/',
     delimiter=',')
Macky
  • 125
  • 9
0

After some help from a colleague, I was able to produce a simple piece of code that was actually running as expected. I was almost there - my code needed a few subtle (yet crucial) modifications. To run the code, open anaconda prompt, type python -m idlelib, open the file and run it.

import pandas as pd
import numpy as np
import csv
import multiprocessing as mp
import timeit


def npv_zcb(core_idx, bnd_file, delimiter=','):
    """
    Michal Mackanic
    06/05/2019 v1.0

    Load bond positions from a .csv file, value the bonds and save results
    back to a .csv file.

    inputs:
        bnd_file: str
            full path to a .csv file with bond positions
        delimiter: str
            delimiter to be used in .csv file
    outputs:
        a .csv file with additional field npv.

    dependencies:

    example:
        npv_zcb('C:\\temp\\bnd_aux.csv', ',')
    """

    # core idx
    print('   npv_zcb() starting on core ' + str(core_idx))

    # load the input file as a dataframe
    bnd_info = pd.read_csv(bnd_file,
                           sep=delimiter,
                           quoting=2,  # csv.QUOTE_NONNUMERIC
                           header=0,
                           doublequote=True,
                           low_memory=False)

    # convert dataframe into list of dictionaries
    bnd_info = bnd_info.to_dict(orient='records')

    # get number of bonds in the file
    bnds_no = len(bnd_info)

    # go bond by bond
    for bnd_idx in range(bnds_no):
        mat = bnd_info[bnd_idx]['maturity']
        nom = bnd_info[bnd_idx]['nominal']
        yld = bnd_info[bnd_idx]['yld']
        bnd_info[bnd_idx]['npv'] = nom / ((1 + yld) ** mat)

    # covert list of dictionaries back to dataframe and save it as .csv file
    bnd_info = pd.DataFrame(bnd_info)
    bnd_info.to_csv(bnd_file,
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    # core idx
    print('   npv_zcb() finished on core ' + str(core_idx))

    # everything OK
    return True


def main(cores_no, bnds_no, path, delimiter):

    if __name__ == '__main__':
        mp.freeze_support()

        # generate random attributes of zero coupon bonds
        print('Generating random zero coupon bonds...')
        bnd_info = np.zeros([bnds_no, 3])
        bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
        bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
        bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
        bnd_info = zip(bnd_info[:, 0], bnd_info[:, 1], bnd_info[:, 2])
        bnd_info = [{'maturity': mat,
                     'nominal': nom,
                     'yld': yld} for mat, nom, yld in bnd_info]
        bnd_info = pd.DataFrame(bnd_info)

        # save bond positions into a .csv file
        bnd_info.to_csv(path + 'bnd_aux.csv',
                        sep=delimiter,
                        quoting=csv.QUOTE_NONNUMERIC,
                        quotechar='"',
                        index=False)

        # prepare one .csv file per core
        print('Preparing input files...')

        idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
        idx.append(bnds_no + 1)

        for core_idx in range(cores_no):
            # save bond positions into a .csv file
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            bnd_info_aux = bnd_info[idx[core_idx]: idx[core_idx + 1]]
            bnd_info_aux.to_csv(file_name,
                                sep=delimiter,
                                quoting=csv.QUOTE_NONNUMERIC,
                                quotechar='"',
                                index=False)

        # SINGLE CORE
        print('Running single core...')

        start = timeit.default_timer()

        # evaluate bond positions
        npv_zcb(1, path + 'bnd_aux.csv', delimiter)

        print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

        # MULTIPLE CORES
        # spread calculation among several cores
        print('Running multiprocessing...')
        print('   ', cores_no, ' core(s)...')

        start = timeit.default_timer()

        processes = []

        # go core by core
        print('        spreading calculation among processes...')
        for core_idx in range(cores_no):
            # run calculations
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            process = mp.Process(target=npv_zcb,
                                     args=(core_idx, file_name, delimiter))
            processes.append(process)
            process.start()

        # wait till every process is finished
        print('        waiting for all processes to finish...')
        for process in processes:
            process.join()

        print('   elapsed time: ', timeit.default_timer() - start, ' seconds')


main(cores_no=2,
     bnds_no=1000000,
     path='C:\\temp\\',
     delimiter=',')
halfer
  • 19,824
  • 17
  • 99
  • 186
Macky
  • 125
  • 9
  • Great, I would like to test it on my machine. @Macky can you share the csv file to run it? Thanks in advance, JL – Bayes May 27 '20 at 23:39