0

I am trying to parallelize a for loop and write results to a text file. Here is my code based on these SO answers:

#multiprocessing version
import multiprocessing as mp
import time
import numpy as np
from itertools import product
from functools import partial

fn = './rules_res_3.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    w1, w2, w3= arg
    res=str(w1) +" " + str(w2) + " " + str(w3) 
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'a') as f:
        while True:
            m = q.get()
            print(m)
            if m == 'kill':
                break
            f.write(m + '\n')
            # f.flush()

def run():
    #must use Manager queue here, or will not work
    # manager = mp.Manager()
    # q = manager.Queue()    
    # pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    # watcher = pool.apply_async(listener, (q,))

    #fire off workers
    r1,r2,r3 = np.arange(0.9, 1.5, 0.1), np.arange(0.9, 1.1, 0.1), np.arange(0, 1, 0.1)
    params=product(r1, r2, r3)
    

    # pool.map(partial(worker, q=q), [arg for arg in params])
    with mp.Manager() as manager:
        pool = mp.Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(listener, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, q=message_queue), params)  # Partial function allows us to use map to divide workload


    #now we are done, kill the listener
    # q.put('kill')
    # pool.close()
    # pool.join()

run()

Problems that I am having:

  • print in listener function doesn't print anything enter image description here

  • this program doesn't write to './rules_res_3.txt' at all. I am not even sure whether workers or listener is running properly at all.

JP Zhang
  • 767
  • 1
  • 7
  • 27
  • What is your OS and output by `python xxx.py`? You should protect the “entry point” of the program by using `if __name__ == '__main__'` according to the [Programming guidelines](https://docs.python.org/3.7/library/multiprocessing.html#programming-guidelines) – HALF9000 Jun 20 '22 at 03:57
  • @HALF9000 I am using windows and running this piece of code on jupyter notebook. – JP Zhang Jun 20 '22 at 04:03

1 Answers1

2

The problem here is that you never allow the loop to end. You have commented out the code that sends the "kill" signal, which is what listener uses to end. Without that, the file is never flushed.

This works:

def run():
    r1,r2,r3 = np.arange(0.9, 1.5, 0.1), np.arange(0.9, 1.1, 0.1), np.arange(0, 1, 0.1)
    params=product(r1, r2, r3)
    
    with mp.Manager() as manager:
        pool = mp.Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(listener, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, q=message_queue), params)  # Partial function allows us to use map to divide workload

        #now we are done, kill the listener
        message_queue.put('kill')
        pool.close()
        pool.join()
Tim Roberts
  • 48,973
  • 4
  • 21
  • 30