23

I have hundreds of thousands of text files that I want to parse in various ways. I want to save the output to a single file without synchronization problems. I have been using multiprocessing pool to do this to save time, but I can't figure out how to combine Pool and Queue.

The following code will save the infile name as well as the maximum number of consecutive "x"s in the file. However, I want all processes to save results to the same file, and not to different files as in my example. Any help on this would be greatly appreciated.

import multiprocessing

with open('infilenamess.txt') as f:
    filenames = f.read().splitlines()

def mp_worker(filename):
 with open(filename, 'r') as f:
      text=f.read()
      m=re.findall("x+", text)
      count=len(max(m, key=len))
      outfile=open(filename+'_results.txt', 'a')
      outfile.write(str(filename)+'|'+str(count)+'\n')
      outfile.close()

def mp_handler():
    p = multiprocessing.Pool(32)
    p.map(mp_worker, filenames)

if __name__ == '__main__':
    mp_handler()
risraelsen
  • 253
  • 1
  • 2
  • 5

3 Answers3

45

Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:

import multiprocessing 
import re

def mp_worker(filename):
    with open(filename) as f:
        text = f.read()
    m = re.findall("x+", text)
    count = len(max(m, key=len))
    return filename, count

def mp_handler():
    p = multiprocessing.Pool(32)
    with open('infilenamess.txt') as f:
        filenames = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, filenames):
            # (filename, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • So, I loop through the results one at a time and write them to the file as they come in? Does that mean that the new worker won't start until each "result" has been written, or will 32 run at a time, but will wait to write? Also, can you explain why you replaced my f.read().splitlines() with [line for line in (l.strip() for l in f) if line]? – risraelsen Oct 28 '14 at 01:04
  • 1
    The 32 processes run in the background and get more filenames in "chunks" as they pass results back to the parent process. Results are passed back immediately so the parent is doing its work in parallel. Its a bit more efficient to read the file line by line than to read the whole thing and split it later... that's what the list is for. – tdelaney Oct 28 '14 at 02:15
  • 1
    Excellent! There are tons of answers about this online but none so simple. Kudos to you! – Menezes Sousa Apr 15 '15 at 02:18
  • Thanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow! – Reihan_amn May 25 '18 at 07:15
  • If the order of the returned results doesn't map, you could also use `imap_unordered`. – zyxue Jul 28 '18 at 16:43
  • This is a great answer. Saved lots of time, thanks. I encountered a problem while applying this to my data. I have a 5GB `csv`, I use 42 cores to process it and write the result to a single `csv` file, however, there always a messed at row 147, I can replicate this using different sub data sets, but same error always happens at that position.real output `cad1d1eaf41c40f89a0198c3be80379f,2018-07-6195d4a2c0914f4381442f08797f658f,2018-06-15 01:47:34,1`, desired output `cad1d1eaf41c40f89a0198c3be80379f,2018-07-30 01:47:34(\n)16195d4a2c0914f4381442f08797f658f,2018-06-15 01:47:34,1` – Jia Gao Feb 20 '19 at 03:44
  • @tdelaney i tried the same code to save my result in a single file, the only difference from the code above is i am writing that code in main file itself rather than calling a function mp_handler(), but the process seems to run for certain time and it pauses without producing result or terminating, do you know why it happens? – rozi Aug 07 '23 at 16:24
  • and also i am saving file using pandas dataframe.to_csv() command – rozi Aug 07 '23 at 16:36
  • @rozi - No. It will be something particular to your code. I suggest you write a new question here on SO with your code (or a slimmed down version demonstrating the same problem). – tdelaney Aug 07 '23 at 17:02
  • Thank you. I will try to do that. I hope it won't show duplicate question though. – rozi Aug 07 '23 at 18:24
13

I took the accepted answer and simplified it for my own understanding of how this works. I am posting it here in case it helps someone else.

import multiprocessing

def mp_worker(number):
    number += 1
    return number

def mp_handler():
    p = multiprocessing.Pool(32)
    numbers = list(range(1000))
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, numbers):
            f.write('%d\n' % result)

if __name__=='__main__':
    mp_handler()
Raj
  • 3,791
  • 5
  • 43
  • 56
  • 1
    Thanks a lot! This is not completely clear for me that when and where the pool is created and when/where it gets drained to the file. my whole code is this: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) it takes one line and returns 30 lines. Do I have to store the result of all process into one list and write into the file? if yes, it is gonna be so slow because when we eventually store our results into the list and it grows and the process becomes so slow! – Reihan_amn May 25 '18 at 07:14
  • For your f.write statement above, I'm using python 3.11.3 and when it get's to the last worker in the pool I get: ```TypeError: %d format: a real number is required, not NoneType```. Is this a newer python version thing? What does '% result' do? – ikwyl6 Jun 11 '23 at 22:50
3

Here's my approach using a multiprocessing Manager object. The nice thing about this approach is that when processing drops out of the manager with block in the run_multi() function, the filewriter queue is automatically closed making code very easy to read and you have no hassle trying to stop listening on the queue.

from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time

def run_multi():
    input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with Manager() as manager:
        pool = Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(file_writer, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, message_queue=message_queue), input)  # Partial function allows us to use map to divide workload

def worker(input: int, message_queue: Queue):
    message_queue.put(input * 10)
    time.sleep(randint(1, 5))  # Simulate hard work

def file_writer(message_queue: Queue):
    with open("demo.txt", "a") as report:
        while True:
            report.write(f"Value is: {message_queue.get()}\n")

if __name__ == "__main__":
    run_multi()
Iain Hunter
  • 4,319
  • 1
  • 27
  • 13