1

I am having a hard time wrapping my head around two things in multiprocessing for python. I have researched the docs but still don't understand:

  1. how to start and join the process in a n-loop (non-repeated code, don't know if I need to do anything special in a Process loop)
  2. how to add to results to a list from each respective process.

Purpose: To break down a giant list and run each chunk separately for a faster run-time.

queue = Queue()

def filter(aBigList, startV, endV, startP, endPr, minV):

    chunks = list(split(aBigList, 6))

    p1 = Process(target=func1, args=(chunks[0], startP, endPr))
    p2 = Process(target=func1, args=(chunks[1], startP, endPr))
    p3 = Process(target=func1, args=(chunks[2], startP, endPr))
    p4 = Process(target=func1, args=(chunks[3], startP, endPr))
    p5 = Process(target=func1, args=(chunks[4], startP, endPr))
    p6 = Process(target=func1, args=(chunks[5], startP, endPr))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
    p6.start()

    #wait for all processes to finish
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    p5.join()
    p6.join()

    print(queue)

def func1(subList, startP, endPr):

    for i in subList:
        price = ind.getPrice(i) #returns a price of argument element
        if startP <= float(price) <= endPr:
            print("added")
            queue.put(i)
LeggoMaEggo
  • 512
  • 1
  • 9
  • 24
  • 1
    Well, your code is correct and should work fine. Do you see "added" or output? You can check if processes are spawning and "split" function does not stuck, for example. – Alex Pertsev Jul 10 '17 at 22:04
  • 1
    Ah, yea. You are correct. I didn't see the print! I added a string along with the print and it did end up printing. – LeggoMaEggo Jul 10 '17 at 22:36

1 Answers1

2

Using a Worker Pool

The python multiprocessing standard library provides a nice class that can be very helpful for this use case: multiprocessing.Pool. It will manage a number of workers for you, and you simply send units of work for it to complete as you need. Here is your code, adapted to use a pool instead of manually creating your own processes.

PROCESS_COUNT = 6

def filter_list(aBigList, startV, endV, startP, endPr, minV):
    list_chunks = list(chunks(aBigList, PROCESS_COUNT))

    pool = multiprocessing.Pool(processes=PROCESS_COUNT)

    for chunk in list_chunks:
        pool.apply_async(func1, (chunk, startP, endPr))

    pool.close()
    pool.join()

    while not queue.empty():
        print(queue.get())

Also, the chunks function can be simply written as, borrowed from this answer.

def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]
foslock
  • 3,639
  • 2
  • 22
  • 26
  • 2
    Alternatively, you can use the `Pool.map()` method and return the values from `func1` instead of using an external Queue object, but both will work. – foslock Jul 10 '17 at 22:09
  • 1
    No need to reinvent the wheel here - using Pool is the right thing to do. +1 – paisanco Jul 10 '17 at 22:37