1

I would like to do the following:

  • read data from a csv file
  • process each line of said csv (assuming this is a long network operation)
  • write to another file the result

I have tried gluing together this and this answers, but with scarce success. The code for the second queue never gets called, therefore there is no writing to disk happening. How do I let the process know there is a second queue?

Note that I am not necessary a fan of multiprocessing. If async/await works better, I am all for it.

My code so far

import multiprocessing
import os
import time

in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()

def worker_main(in_queue, out_queue):
    print (os.getpid(), "working")
    while True:
        item = in_queue.get(True)
        print (os.getpid(), "got", item)
        time.sleep(1) #long network processing
        print (os.getpid(), "done", item)
        # put the processed items to be written to disl
        out_queue.put("processed:" + str(item))


pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))

for i in range(5): # let's assume this is the file reading part
    in_queue.put(i)

with open('out.txt', 'w') as file:

    while not out_queue.empty():
        try:
            value = q.get(timeout = 1)
            file.write(value + '\n')
        except Exception as qe:
            print ("Empty Queue or dead process")
martineau
  • 119,623
  • 25
  • 170
  • 301
meto
  • 3,425
  • 10
  • 37
  • 49

1 Answers1

3

First issue I encountered trying to execute your code was:

An attempt has been made to start a new process before the current process has finished 
its bootstrapping phase. This probably means that you are not using fork to start your 
child processes and you have forgotten to use the proper idiom in the main module

I had to wrap any module scope instructions in the if __name__ == '__main__': idiom. Read more here.

As your goal is to iterate over the lines of a file, Pool.imap() seems like a good fit. The imap() docs refer to the map() docs, the difference being that imap() lazily pulls the next items from the iterable (which in your case will be the csv file) which will be beneficial if your csv file is large. So from the map() docs:

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.

imap() returns an iterator so you can then iterate over the results produced by the process workers to do what you have to do with them (in the case of your example it is to write the results to a file).

Here is a working example:

import multiprocessing
import os
import time


def worker_main(item):
    print(os.getpid(), "got", item)
    time.sleep(1) #long network processing
    print(os.getpid(), "done", item)
    # put the processed items to be written to disl
    return "processed:" + str(item)


if __name__ == '__main__':
    with multiprocessing.Pool(3) as pool:
        with open('out.txt', 'w') as file:
            # range(5) simulating a 5 row csv file.
            for proc_row in pool.imap(worker_main, range(5)):
                file.write(proc_row + '\n')

# printed output:
# 1368 got 0
# 9228 got 1
# 12632 got 2
# 1368 done 0
# 1368 got 3
# 9228 done 1
# 9228 got 4
# 12632 done 2
# 1368 done 3
# 9228 done 4

out.txt looks like this:

processed:0
processed:1
processed:2
processed:3
processed:4

Notice I've not had to use any queues either.

SuperShoot
  • 9,880
  • 2
  • 38
  • 55
  • 1
    Thanks, that's very helpful. I run my code on a jupiter notebook, so that's probability why I didn't get the same error – meto Mar 26 '19 at 19:10