0

I've written a multiprocessing script in Python, largely based on the answer provided to this question: Python multiprocessing safely writing to a file. While the workers are getting triggered fine and all jobs are returning the right responses, the listener only receives the first message, almost like the while loop is entirely ignored. I'm pretty new to Python and I could definitely use some expert advice in resolving this issue.

Code:

import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare

fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"

def worker(files, q):
    res = prepare(files)
    # This line is printed successfully for every response
    print("MESSAGE ABOUT TO BE LOGGED: ", res)
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''
    with open(fileName, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
        while True:
            m = q.get()
            # This line is printed only once!!!!
            print("MESSAGE RECEIVED BY LISTENER: ", m)
            if not m:
                print("Message is empty. Skipping to the next message")
            elif m["kill"]:
                print("Completed processing all jobs")
                break
            elif m["error"]:
                print("Error in job id: " + m["processId"] + ". Error is: " + m["error"])
            else:
                if m["warn"]:
                    print("Job id: " + m["processId"] + " has the following warnings: " + m["warn"])
                row = list()
                row.append(m["key1"])
                row.append(m["key2"])
                row.append(m["key3"])
                row.append(m["key4"])
                row.append(m["key5"])
                print("Row to be written is: " + row)
                writer.writerow(row)

def getFilenamesFromCSV(csvFile):
    filenames = list()
    if not csvFile:
        return None
    with open(csvFile, 'r', newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0]:
                filenames.append(list(row[0].split(",")))
    print(filenames)
    return filenames

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    filesToProcess = getFilenamesFromCSV("input.csv")
    jobs = []
    for files in filesToProcess:
        job = pool.apply_async(worker, (files, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    killCommand = dict()
    killCommand["kill"] = "KILL"
    q.put(killCommand)
    pool.close()
    pool.join()


if __name__ == "__main__":
   main()
eyllanesc
  • 235,170
  • 19
  • 170
  • 241
user1452030
  • 1,001
  • 3
  • 10
  • 18
  • that's an unnecessarily complicated way of arranging the code, I'd suggest using `pool.imap_unordered` and moving most of the code from `listener` into your `main` function – Sam Mason Jan 01 '20 at 16:06
  • Your `q = manager.Queue()` is empty, therefore this `m = q.get()` will throw a `queue.empty` error. – stovfl Jan 01 '20 at 16:44

1 Answers1

0

I ended up re-writing the code without using a queue and this version works fine. The queue implementation only works if the worker process responds quickly (and it's not timeout related, I didn't get any errors and I tried setting explicit timeouts as well).

import multiprocessing as mp
import time
import datetime
import csv
from audio import prepare

fileName = "uuid_file_mapping_" + datetime.datetime.today().strftime('%Y%m%d%H%M%S') + ".csv"

# This is where the actual processing is done
def worker(files):
    return prepare(files)

# Read filenames from input CSV for processing
def getFilenamesFromCSV(csvFile):
    filenames = list()
    if not csvFile:
        return None
    with open(csvFile, 'r', newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0]:
                filenames.append(list(row[0].split(",")))
    print(filenames)
    return filenames

# Decides what, if any, needs to be written to the output file
def whatShouldIWriteToOutputCSV(audioPrepareResult):
    if not audioPrepareResult:
        return None
    elif audioPrepareResult.get("error", None):
        print("Error in job id: " + audioPrepareResult["processId"] + ". Error is: " + audioPrepareResult["error"])
        return None
    else:
        row = list()
        row.append(audioPrepareResult["key1"])
        row.append(audioPrepareResult["key2"])
        row.append(audioPrepareResult["key3"])
        row.append(audioPrepareResult["key4"])
        row.append(audioPrepareResult["key5"])
        return row

def main():

    pool = mp.Pool(mp.cpu_count())

    #fire off workers
    filesToProcess = getFilenamesFromCSV("input.csv")
    jobs = []
    for files in filesToProcess:
        jobs.append(pool.apply_async(worker, ([files])))

    with open(fileName, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Process ID","Source Files","Archive","Preview","Logo"])
        for job in jobs:
            result = whatShouldIWriteToOutputCSV(job.get())
            if(result):
                writer.writerow(result)

    pool.close()
    pool.join()


if __name__ == "__main__":
   main()
user1452030
  • 1,001
  • 3
  • 10
  • 18