I am working segmenting large ctype arrays and processing them in parallel. I am receiving the error below and believe it it because one segment of the array is finishing processing before another. I tried using process.join() to have the first set of processes wait, but that is not working. Ideas?
Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
Using:
....
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is now stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
jobs =[]
for i in range (0, y, step):
process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
jobs.append(process)
process.start()
for j in jobs:
j.join()
del jobs
del process
Update:
#Create an ctypes array
array = ArrayConvert.SharedMemArray(array)
#Create a global of options
init_options(options) #options is a dict
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is not stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
for i in range (0, y, step):
#Package all the options into a global dictionary
p.map_async(stretch,[slice(i, i+step)])
#p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)
p.join()
def init_options(options_):
global kwoptions
kwoptions = options_
The function that I am passing to map_async is stored in a different module, so I am struggling to get the global kwoptions passed to that function. It does not seem right to be passing globals around between modules like this (unpythonic). Is this the way to be able to pass kwargs through map_async.
Should I be reworking the multiprocessing using something different (apply or Process)?