0

When array (data) of more than 10,000, not all processes are finished (see last line print('compete')). When array up to 2,000 elements this code works fine. I think problem with queue, without result_queue.put([i,j]) all processes properly completed. Can anybody help me with this part of code?

    def finder(start,end,proc,result_queue,lock):
    global data
    i=start
    while i<=end:
        el=data[i]
        j=-1
        for el1 in data:
            j=j+1
            s1 = SequenceMatcher(None, el, el1)
            s1_val=s1.ratio()
            if s1_val>0.9: result_queue.put([i,j])
        i=i+1
    print('end')


if __name__ == '__main__':
    multiprocessing.freeze_support()
    result_queue = multiprocessing.Queue()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()

    while i<parts:
            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)
            allProcesses.append(p)
            p.daemon = True
            p.start()
            i=i+1
            print('started process',i)
    i=0

    for p in allProcesses:
        p.join()
        print('complete')
  • Is there any difference when run without `p.daemon = True`? – J.J. Hakala Jul 03 '16 at 16:49
  • no difference((( only 3 of 8 completed – Alexander Vedmed' Jul 03 '16 at 16:53
  • There might be a size limit for the queue. If there is no consumer for the queue, it will get full and `queue.put()` will block. At least [this](http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767) suggests that there is a limit. – J.J. Hakala Jul 03 '16 at 16:57
  • length of my queue lower then 32767. real length shorter than 500-600. and code executed without errors – Alexander Vedmed' Jul 03 '16 at 17:02
  • Can you create a simplified MCVE (http://stackoverflow.com/help/mcve) that demonstrates the problem? We can't run the code that you have provided because we don't have `SequenceMatcher`, and `r` is not defined. – Warren Weckesser Jul 04 '16 at 02:32

1 Answers1

2

Short answer: Use multiprocessing.Manager to create the Queue

    m = multiprocessing.Manager()
    result_queue = m.Queue()

A bit more detailed answer: the multiprocessing.Manager will return an

<class 'multiprocessing.managers.AutoProxy[Queue]'>

instance which can be shared safely among workers.

Here is a complete runable example

import time
import multiprocessing


def finder(start,end,proc,result_queue,lock):
    #global data


    for i in range(start, end+1):
        #print (type(result_queue))

        result_queue.put((i,))




    print('end %s'%proc)

r = {i:i for i in range(100000)}


def main():

    multiprocessing.freeze_support()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()
    m = multiprocessing.Manager()
    result_queue = m.Queue()
    while i<parts:

            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)

            p.daemon = False
            p.start()
            i=i+1
            print('started process',i)
            allProcesses.append(p)


    for p in allProcesses:
        print("join", p)

        print(p.join())
        print('complete')


if __name__ == '__main__':
    main()

If you change the m.Queue to multiprocessing.Queue you will see your old behavior

Yoav Glazner
  • 7,936
  • 1
  • 19
  • 36