I am trying to parallelize a for loop and write results to a text file. Here is my code based on these SO answers:
- Python: Writing to a single file with queue while using multiprocessing Pool
- Python multiprocessing safely writing to a file
#multiprocessing version
import multiprocessing as mp
import time
import numpy as np
from itertools import product
from functools import partial
fn = './rules_res_3.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
w1, w2, w3= arg
res=str(w1) +" " + str(w2) + " " + str(w3)
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'a') as f:
while True:
m = q.get()
print(m)
if m == 'kill':
break
f.write(m + '\n')
# f.flush()
def run():
#must use Manager queue here, or will not work
# manager = mp.Manager()
# q = manager.Queue()
# pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
# watcher = pool.apply_async(listener, (q,))
#fire off workers
r1,r2,r3 = np.arange(0.9, 1.5, 0.1), np.arange(0.9, 1.1, 0.1), np.arange(0, 1, 0.1)
params=product(r1, r2, r3)
# pool.map(partial(worker, q=q), [arg for arg in params])
with mp.Manager() as manager:
pool = mp.Pool() # By default pool will size depending on cores available
message_queue = manager.Queue() # Queue for sending messages to file writer listener
pool.apply_async(listener, (message_queue, )) # Start file listener ahead of doing the work
pool.map(partial(worker, q=message_queue), params) # Partial function allows us to use map to divide workload
#now we are done, kill the listener
# q.put('kill')
# pool.close()
# pool.join()
run()
Problems that I am having: