I have a huge XML file (almost 5Gig). I try to search throughout the file, find some tags and rename them. I used the same idea in here to chunk the file into 10 megabyte chunks, search through each chunk, if that chunk contains the search item then send the chunk to another helper to read the chunk line by line and replace the tag. It does not work! It seems when it tries to merge the queues and write the file back it does not work and the result file start from somewhere arbitrary.
import re, threading, Queue
FILE_R = "C:\\Users\\USOMZIA\Desktop\\ABB_Work\\ERCOT\\Modifying_cim_model\\omid2.xml"
FILE_WR = "C:\\Users\\USOMZIA\Desktop\\ABB_Work\\ERCOT\\Modifying_cim_model\\x3.xml"
def get_chunks(file_r, size = 1024 * 1024):
with open(file_r, 'rb') as f:
while 1:
start = f.tell()
f.seek(size, 1)
s = f.readline()
yield start, f.tell() - start
if not s:
break
def process_line_by_line(file_r, chunk):
with open(file_r, "rb") as f:
f.seek(chunk[0])
read_line_list = []
for line_f in f.read(chunk[1]).splitlines():
find_match = False
for match_str in mapp:
if match_str in str(line_f):
find_match = True
new_line = str(line_f).replace(match_str, mapp[match_str])
read_line_list.append(new_line)
break
if not find_match:
read_line_list.append(str(line_f))
return read_line_list
def process(file_r, chunk):
read_group_list = []
with open(file_r, "r") as f:
f.seek(chunk[0])
s = f.read(chunk[1])
if len(pattern.findall(s)) > 0:
read_group_list = process_line_by_line(file_r, chunk)
else:
read_group_list = f.read(chunk[1]).splitlines()
return read_group_list
class Worker(threading.Thread):
def run(self):
while 1:
chunk = queue.get()
if chunk is None:
break
result.append(process(*chunk))
queue.task_done()
import time, sys
start_time = time.time()
pattern_list = []
mapp = {"cim:ConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:Load rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoad rdf:ID": "cim:CustomerLoad rdf:ID",
"cim:InductionMotorLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoadGroup rdf:ID": "cim:ConformLoadGroup rdf:ID",
"cim:NonConformLoad.LoadGroup": "cim:ConformLoad.LoadGroup",
"/cim:ConformLoad>": "/cim:CustomerLoad>", "/cim:Load>": "/cim:CustomerLoad>", "/cim:NonConformLoad>": "/cim:CustomerLoad>",
"/cim:InductionMotorLoad>": "/cim:CustomerLoad>", "/cim:NonConformLoadGroup>": "/cim:ConformLoadGroup>"}
reg_string =""
for key in mapp:
reg_string = reg_string + key+ "|"
# to delete the last |
reg_string = list(reg_string)[:-1]
reg_string = ''.join(reg_string)
pattern = re.compile(r"cim:%s.*" %reg_string)
# This makes it faster than write an mo = pattern.search(line) in the loop
search = pattern.search
queue = Queue.Queue()
result = []
# Start the multithread
for i in range(1):
w = Worker()
w.setDaemon(1)
w.start()
chunks = get_chunks(FILE_R, 10 * 1024 * 1024)
for chunk in chunks:
print chunk
queue.put((FILE_R, chunk))
queue.join()
with open(FILE_WR, "w") as f:
for file_chunk in range(len(result)):
for line in result[file_chunk]:
f.write("%s\n" % line)
print time.time() - start_time
So, I think the problem is when the jobs in queue gets done they are not somehow in order and as a result it is not synchronized. Is there anyway that I can somehow synchronized them? Thank you for the help!