What I am trying to do?
A Python script that would read a large file (1-8G), match on a field called record (which can be "r" or "f") and can have same ID. If the ID is same, combine the "f" and "r" lines and save it to dictionary with ID as the key. In order to speed up the process, I split the file in multiple small ones and ran the function on separate files in different processes. In the end combine the dictionaries created by each process.
First I tried using multiprocessing Pool, but it is making the processing slower. Read somewhere to try multiprocessing Processes and Queue, but I am trying to put dictionary (which is large) in the Queue and looks like Queue is not able to handle the size. The script hangs at this point.
Any suggestions on how I can improve the processing time?
PS: I am sure this is CPU bound because I did a "pass" while reading the lines and it took 97sec and when ran the entire script, it took 600 sec.
import multiprocessing
def rLine(line, resultDict):
id= line[2]
rOutput= line[4]+" "+line[5] #this is a long string
rList= rOutput.split(" ")
resultDict[id].extend(rList)
def fLine(line, resultDict):
id= line[7]
fOutput= line[4]+" "+line[5] #this is a long string
fList= rOutput.split(" ")
resultDict[id].extend(fList)
def mainFunction(thisFile, output):
resultDict= defaultdict(list)
with gzip.open(thisFile) as f:
for lines in f:
line= lines.split(" ")
record= line[0]
if record=="f":
fLine(line, resultDict)
else:
rLine(line, resultDict)
output.put(resultDict)
if __name__=='__main__':
finalDict= defaultdict(list)
output= multiprocessing.Queue()
processes= list()
files= ["f1.log.gz", "f2.log.gz"] #long list
for f in files:
process.append(multiprocessing.Process(target=mainFunction, args=(f,output))
for p in processes:
p.start()
for p in processes:
p.join()
for p in process:
finalDict.update(output.get())
print finalDict