I saw somewhere a hint on how to process a large dataset (say lines of text) faster with the multiprocessing module, something like:
... (form batch_set = nump batches [= lists of lines to process], batch_set
is a list of lists of strings (batches))
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
results = sorted([output.get() for p in processes])
... (do something with the processed outputs, ex print them in order,
given that each proc_lines function returns a couple (i, out_batch))
However, when i run the code with a small number of lines/batch it works fine [ex: './code.py -x 4:10' for nump=4 and numb=10 (lines/batch)] while after a certain number of lines/batch is hangs [ex: './code.py -x 4:4000'] and when i interrupt it i see a traceback hint about a _wait_for_tstate_lock and the system threading library. It seems that the code does not reach the last shown code line above...
I provide the code below, in case somebody needs it to answer why this is happening and how to fix it.
#!/usr/bin/env python3
import sys
import multiprocessing as mp
def fabl(numb, nump):
'''
Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines
'<idx> my line here' with indexes from 1 to (nump x numb).
'''
ret = []
idx = 1
for _ in range(nump):
cb = []
for _ in range(numb):
cb.append('%07d my line here' % idx)
idx += 1
ret.append(cb)
return ret
def proc_lines(i, output, rows_in):
ret = []
for row in rows_in:
row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part
ret.append(row)
output.put((i,ret))
return
def mp_proc(batch_set):
'given the batch, disperse it to the number of processes and ret the results'
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
print('waiting for procs to complete...')
results = sorted([output.get() for p in processes])
return results
def write_set(proc_batch_set, fout):
'write p[rocessed]batch_set'
for _, out_batch in proc_batch_set:
for row in out_batch:
fout.write(row)
return
def main():
args = sys.argv
if len(args) < 2:
print('''
run with args: -x [ NumProc:BatchSize ]
( ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...) )
''')
sys.exit(0)
numb = 10 # suppose we need this number of lines/batch : BatchSize
nump = 4 # number of processes to use. : NumProcs
if len(args) > 2 and ':' in args[2]: # use another np:bs
nump, numb = map(int, args[2].split(':'))
batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines)
proc_batch_set = mp_proc(batch_set)
with open('out-min', 'wt') as fout:
write_set(proc_batch_set, fout)
return
if __name__ == '__main__':
main()