2

I am filtering huge text files using multiprocessing.py. The code basically opens the text files, works on it, then closes it.

Thing is, I'd like to be able to launch it successively on multiple text files. Hence, I tried to add a loop, but for some reason it doesn't work (while the code works on each file). I believe this is an issue with:

    if __name__ == '__main__':    

However, I am looking for something else. I tried to create a Launcher and a LauncherCount files like this:

    LauncherCount.py:

    def setLauncherCount(n):
        global LauncherCount
        LauncherCount = n

and,

    Launcher.py:

import os
import LauncherCount

LauncherCount.setLauncherCount(0)

os.system("OrientedFilterNoLoop.py")

LauncherCount.setLauncherCount(1)

os.system("OrientedFilterNoLoop.py")

...

I import LauncherCount.py, and use LauncherCount.LauncherCount as my loop index.

Of course, this doesn't work too as it edits the variable LauncherCount.LauncherCount locally, so it won't be edited in the imported version of LauncherCount.

Is there any way to edit globally a variable in an imported file? Or, is there any way to do this in any other way? What I need is running a code multiple times, in changing one value, and without using any loop apparently.

Thanks!

Edit: Here is my main code if necessary. Sorry for the bad style ...

import multiprocessing
import config
import time
import LauncherCount

class Filter:

    """ Filtering methods """
    def __init__(self):
        print("launching methods")

        #   Return the list: [Latitude,Longitude]  (elements are floating point numbers)
    def LatLong(self,line):

        comaCount = []
        comaCount.append(line.find(','))
        comaCount.append(line.find(',',comaCount[0] + 1))
    comaCount.append(line.find(',',comaCount[1] + 1))
    Lat = line[comaCount[0] + 1 : comaCount[1]]
    Long = line[comaCount[1] + 1 : comaCount[2]]

    try:
        return [float(Lat) , float(Long)]
    except ValueError:
        return [0,0]

#   Return a boolean:
#   - True if the Lat/Long is within the Lat/Long rectangle defined by:
#           tupleFilter = (minLat,maxLat,minLong,maxLong)
#   - False if not                                                                   
def LatLongFilter(self,LatLongList , tupleFilter) :
    if tupleFilter[0] <= LatLongList[0] <= tupleFilter[1] and
       tupleFilter[2] <= LatLongList[1] <= tupleFilter[3]:
        return True
    else:
        return False

def writeLine(self,key,line):
    filterDico[key][1].write(line)



def filteringProcess(dico):

    myFilter = Filter()

    while True:
        try:
            currentLine = readFile.readline()
        except ValueError:
            break
        if len(currentLine) ==0:                    # Breaks at the end of the file
            break
        if len(currentLine) < 35:                    # Deletes wrong lines (too short)
            continue
        LatLongList = myFilter.LatLong(currentLine)
        for key in dico:
            if myFilter.LatLongFilter(LatLongList,dico[key][0]):
                myFilter.writeLine(key,currentLine)


###########################################################################
                # Main
###########################################################################

# Open read files:
readFile = open(config.readFileList[LauncherCount.LauncherCount][1], 'r')

# Generate writing files:
pathDico = {}
filterDico = config.filterDico

# Create outputs
for key in filterDico:
    output_Name = config.readFileList[LauncherCount.LauncherCount][0][:-4] 
                  + '_' + key +'.log'
    pathDico[output_Name] = config.writingFolder + output_Name
    filterDico[key] = [filterDico[key],open(pathDico[output_Name],'w')]


p = []
CPUCount = multiprocessing.cpu_count()
CPURange = range(CPUCount)

startingTime = time.localtime()

if __name__ == '__main__':
    ### Create and start processes:
    for i in CPURange:
        p.append(multiprocessing.Process(target = filteringProcess , 
                                            args = (filterDico,)))
        p[i].start()

    ### Kill processes:
    while True:
        if [p[i].is_alive() for i in CPURange] == [False for i in CPURange]:
            readFile.close()
            for key in config.filterDico:
                config.filterDico[key][1].close()
                print(key,"is Done!")
                endTime = time.localtime()
            break

    print("Process started at:",startingTime)
    print("And ended at:",endTime)
kevad
  • 409
  • 1
  • 5
  • 15
  • "Thing is, I'd like to be able to launch it successively on multiple text files." This seems to be what a Queue is for. Why aren't you using a Queue for this? – S.Lott Jan 25 '12 at 21:42
  • If I get it, queues are used to exchange values and info between processes? What I want to do is not extending processes so they can work on successive files, but rather waiting for the processes to be done, and creating a new bunch of processes with the same method on the new input file. – kevad Jan 25 '12 at 22:45
  • 1
    That seems backwards. Why not have a bunch of processes all reading queues waiting for file names. One process finishes, it puts the file name into the queue for the next process. That way the synchronization is easy. Read name from queue; do work; write name to another queue. Why aren't you doing that? – S.Lott Jan 26 '12 at 00:08

1 Answers1

1

To process groups of files in sequence while working on files within a group in parallel:

#!/usr/bin/env python
from multiprocessing import Pool

def work_on(args):
    """Process a single file."""
    i, filename = args
    print("working on %s" % (filename,))
    return i

def files():
    """Generate input filenames to work on."""
    #NOTE: you could read the file list from a file, get it using glob.glob, etc
    yield "inputfile1"
    yield "inputfile2"

def process_files(pool, filenames):
    """Process filenames using pool of processes.

    Wait for results.
    """
    for result in pool.imap_unordered(work_on, enumerate(filenames)):
        #NOTE: in general the files won't be processed in the original order
        print(result) 

def main():
   p = Pool()

   # to do "successive" multiprocessing
   for filenames in [files(), ['other', 'bunch', 'of', 'files']]:
       process_files(p, filenames)

if __name__=="__main__":
   main()

Each process_file() is called in sequence after the previous one has been complete i.e., the files from different calls to process_files() are not processed in parallel.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • I don't use the same output files for each input file, so basically everytime a file is done, I close all my output files, create a bunch of new ones, and launching a new bunch of processes on new input and new outputs.. How can I do this with your solution? – kevad Jan 25 '12 at 22:49
  • 1
    @user1154967: generate output filenames based on input filenames e.g., `output_filename = filename+'.output'` – jfs Jan 25 '12 at 23:04
  • This will also create a parallel multiprocessing, while I am looking for a method to do successive multiprocessing, for a matter of security for the files and cache memory as the input files are about 35GB and I read those from a DB server. – kevad Jan 25 '12 at 23:17
  • @user1154967: I've updated the answer to show how to do "successive" multiprocessing. – jfs Jan 26 '12 at 00:46
  • I needed time to understand this code but I think this is it! I'll implement it and test it soon when the cores will be available. For the moment I hard coded the "successivity" by creating multiple version of the main code with different read file indexes: I launch them using os.system("link") one after the other. Besides helping me with this issue, I think you also gave me a solution for something else I was looking for: a way of not keeping too much data on cache. From what I read about "yield", this is exactly what this is for, as I just need to go over one file once? Thank you! – kevad Jan 26 '12 at 18:29
  • @user1154967: I've used `yield` to demonstrate that you don't need to generate all input filenames at once. [The Python yield keyword explained](http://stackoverflow.com/q/231767/4279) – jfs Jan 26 '12 at 18:58