0

I have a huge XML file (almost 5Gig). I try to search throughout the file, find some tags and rename them. I used the same idea in here to chunk the file into 10 megabyte chunks, search through each chunk, if that chunk contains the search item then send the chunk to another helper to read the chunk line by line and replace the tag. It does not work! It seems when it tries to merge the queues and write the file back it does not work and the result file start from somewhere arbitrary.

import re, threading, Queue
FILE_R = "C:\\Users\\USOMZIA\Desktop\\ABB_Work\\ERCOT\\Modifying_cim_model\\omid2.xml"
FILE_WR = "C:\\Users\\USOMZIA\Desktop\\ABB_Work\\ERCOT\\Modifying_cim_model\\x3.xml"
def get_chunks(file_r, size = 1024 * 1024):
    with open(file_r, 'rb') as f:
        while 1:
            start = f.tell()
            f.seek(size, 1)
            s = f.readline()
            yield start, f.tell() - start

            if not s:
                break

def process_line_by_line(file_r, chunk):
    with open(file_r, "rb") as f:
        f.seek(chunk[0])
        read_line_list = []
        for line_f in f.read(chunk[1]).splitlines():
            find_match = False
            for match_str in mapp:
                if match_str in str(line_f):
                    find_match = True
                    new_line = str(line_f).replace(match_str, mapp[match_str]) 
                    read_line_list.append(new_line)
                    break
            if not find_match:
                read_line_list.append(str(line_f))

    return read_line_list

def process(file_r, chunk):
    read_group_list = []
    with open(file_r, "r") as f:
        f.seek(chunk[0])
        s = f.read(chunk[1])
        if len(pattern.findall(s)) > 0:
            read_group_list = process_line_by_line(file_r, chunk)
        else:
            read_group_list = f.read(chunk[1]).splitlines()
    return read_group_list

class Worker(threading.Thread):
    def run(self):
        while 1:
            chunk = queue.get()
            if chunk is None:
                break
            result.append(process(*chunk))
            queue.task_done()       





import time, sys
start_time = time.time()
pattern_list = []
mapp = {"cim:ConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:Load rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", 
        "cim:InductionMotorLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoadGroup rdf:ID": "cim:ConformLoadGroup rdf:ID",
        "cim:NonConformLoad.LoadGroup": "cim:ConformLoad.LoadGroup",
        "/cim:ConformLoad>": "/cim:CustomerLoad>", "/cim:Load>": "/cim:CustomerLoad>", "/cim:NonConformLoad>": "/cim:CustomerLoad>",
        "/cim:InductionMotorLoad>": "/cim:CustomerLoad>", "/cim:NonConformLoadGroup>": "/cim:ConformLoadGroup>"}
reg_string =""
for key in mapp:
    reg_string = reg_string + key+ "|"
# to delete the last |
reg_string = list(reg_string)[:-1]
reg_string = ''.join(reg_string)
pattern = re.compile(r"cim:%s.*" %reg_string)
# This makes it faster than write an mo = pattern.search(line) in the loop
search = pattern.search
queue = Queue.Queue()
result = []
# Start the multithread
for i in range(1):
    w = Worker()
    w.setDaemon(1)
    w.start()

chunks = get_chunks(FILE_R, 10 * 1024 * 1024)
for chunk in chunks:
    print chunk
    queue.put((FILE_R, chunk))
queue.join()

with open(FILE_WR, "w") as f:
    for file_chunk in range(len(result)):
        for line in result[file_chunk]:
            f.write("%s\n" % line)


print time.time() - start_time

So, I think the problem is when the jobs in queue gets done they are not somehow in order and as a result it is not synchronized. Is there anyway that I can somehow synchronized them? Thank you for the help!

omid
  • 21
  • 4
  • Do you realy need to use `multithreading`? – stovfl Feb 27 '19 at 21:19
  • I was thinking to create a pool and put each chunk in the pool and use multiprocessing I don't know if that one works or not. I just want to process the file in parallel to use as far cpu resource as I can. Do you have another idea? – omid Feb 27 '19 at 21:38
  • *"process the file in parallel to use as far cpu resource"*: You are missleaded. Read [multiprocessing-vs-threading-python](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python) and [no-benefit-from-python-multi-threading-in-io-task](https://stackoverflow.com/questions/51164800/no-benefit-from-python-multi-threading-in-io-task) – stovfl Feb 27 '19 at 21:44
  • To be honest I tried it with one up to four processes and changing the number of processes change the time. – omid Feb 27 '19 at 22:29

1 Answers1

0

I think I found what the problem is:

read_group_list = f.read(chunk[1]).splitlines()

This line in the process function creates the proble. After I replaced it with:

read_group_list = s.splitlines()

It gives me the correct file now.

omid
  • 21
  • 4