I've written a multiprocessing script in Python, largely based on the answer provided to this question: Python multiprocessing safely writing to a file. While the workers are getting triggered fine and all jobs are returning the right responses, the listener only receives the first message, almost like the while loop is entirely ignored. I'm pretty new to Python and I could definitely use some expert advice in resolving this issue.
Code:
import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare
fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"
def worker(files, q):
res = prepare(files)
# This line is printed successfully for every response
print("MESSAGE ABOUT TO BE LOGGED: ", res)
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fileName, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
while True:
m = q.get()
# This line is printed only once!!!!
print("MESSAGE RECEIVED BY LISTENER: ", m)
if not m:
print("Message is empty. Skipping to the next message")
elif m["kill"]:
print("Completed processing all jobs")
break
elif m["error"]:
print("Error in job id: " + m["processId"] + ". Error is: " + m["error"])
else:
if m["warn"]:
print("Job id: " + m["processId"] + " has the following warnings: " + m["warn"])
row = list()
row.append(m["key1"])
row.append(m["key2"])
row.append(m["key3"])
row.append(m["key4"])
row.append(m["key5"])
print("Row to be written is: " + row)
writer.writerow(row)
def getFilenamesFromCSV(csvFile):
filenames = list()
if not csvFile:
return None
with open(csvFile, 'r', newline='') as f:
reader = csv.reader(f)
for row in reader:
if row[0]:
filenames.append(list(row[0].split(",")))
print(filenames)
return filenames
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
filesToProcess = getFilenamesFromCSV("input.csv")
jobs = []
for files in filesToProcess:
job = pool.apply_async(worker, (files, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
killCommand = dict()
killCommand["kill"] = "KILL"
q.put(killCommand)
pool.close()
pool.join()
if __name__ == "__main__":
main()