Building on the script from this answer, I have the following scenario: A folder containing 2500 large text files (~ 55Mb each), all tab delimited. Web logs, basically.
I need to md5 hash the second 'column' in each row of each file, saving the modified files elsewhere. The source files are on a mechanical disk and the destination files are on an SSD.
The script processes the first 25 (or so) files really quickly. It then slows WAY down. Based on the first 25 files, it should complete all of them in 2 minutes or so. However, based on the performance after that, it will take 15 minutes (or so) to complete them all.
It's running on a server with 32 Gb of RAM and task manager rarely shows over 6 Gb in use. I have it set to launch 6 processes, but the CPU usage on the cores is low, rarely going over 15%.
Why is this slowing down? Read/write issues to the disk? Garbage collector? Bad code? Any ideas about how to speed it up?
Here's the script
import os
import multiprocessing
from multiprocessing import Process
import threading
import hashlib
class ThreadRunner(threading.Thread):
""" This class represents a single instance of a running thread"""
def __init__(self, fileset, filedirectory):
threading.Thread.__init__(self)
self.files_to_process = fileset
self.filedir = filedirectory
def run(self):
for current_file in self.files_to_process:
# Open the current file as read only
active_file_name = self.filedir + "/" + current_file
output_file_name = "D:/hashed_data/" + "hashed_" + current_file
active_file = open(active_file_name, "r")
output_file = open(output_file_name, "ab+")
for line in active_file:
# Load the line, hash the username, save the line
lineList = line.split("\t")
if not lineList[1] == "-":
lineList[1] = hashlib.md5(lineList[1]).hexdigest()
lineOut = '\t'.join(lineList)
output_file.write(lineOut)
# Always close files after you open them
active_file.close()
output_file.close()
print "\nCompleted " + current_file
class ProcessRunner:
""" This class represents a single instance of a running process """
def runp(self, pid, numThreads, fileset, filedirectory):
mythreads = []
for tid in range(numThreads):
th = ThreadRunner(fileset, filedirectory)
mythreads.append(th)
for i in mythreads:
i.start()
for i in mythreads:
i.join()
class ParallelExtractor:
def runInParallel(self, numProcesses, numThreads, filedirectory):
myprocs = []
prunner = ProcessRunner()
# Store the file names from that directory in a list that we can iterate
file_names = os.listdir(filedirectory)
file_sets = []
for i in range(numProcesses):
file_sets.append([])
for index, name in enumerate(file_names):
num = index % numProcesses
file_sets[num].append(name)
for pid in range(numProcesses):
pr = Process(target=prunner.runp, args=(pid, numThreads, file_sets[pid], filedirectory))
myprocs.append(pr)
for i in myprocs:
i.start()
for i in myprocs:
i.join()
if __name__ == '__main__':
file_directory = "E:/original_data"
processes = 6
threads = 1
extractor = ParallelExtractor()
extractor.runInParallel(numProcesses=processes, numThreads=threads, filedirectory=file_directory)