1

I am writing a Python script that will iterate over a large XML file of articles and write each article to a new file. I also would like to keep an index of where each article starts in the file. If the script fails while processing the file (which takes many hours), I'd like to be able to pick up where I left off using something like file.seek().

This is my current code:

with open(inputPath, 'rb') as inputfile: # Iterate over the dump
    for line in inputfile:
    # do stuff...

I believe that file.tell() inconsistency applies here and won't give me the correct file position. I would also like to determine the position of the beginning of the line instead of the end.

The XML file looks like:

<pages>
  <page>
  This is an article.  Article-ly stuff goes here.
  </page>
  <page>
  ... x5,000,000
  </page>
</pages>

I would like to get the position of the beginning of the line with the <page> tag as I iterate over the file.

Community
  • 1
  • 1
miller9904
  • 109
  • 9

2 Answers2

1

Here's a solution based on your linked answer:

offsets = []
with open('test.xml', 'rb') as inputfile: # Iterate over the dump
    # Save 1st line offset
    current = inputfile.tell()
    for line in iter(inputfile.readline,''):
        if line.lstrip().startswith('<page>'):
            offsets.append(current)
        # Update to the current offset (line about to be read)
        current = inputfile.tell()

# Demo the offsets are lines with <page>
with open('test.xml', 'rb') as inputfile: # Iterate over the dump
    for offset in offsets:
        inputfile.seek(offset)
        print offset,inputfile.readline()

Output:

9   <page>

82   <page>
Mark Tolonen
  • 166,664
  • 26
  • 169
  • 251
  • I'm processing a 57 GB xml file, and I think loading each line one at a time would be too slow. – miller9904 Oct 29 '16 at 15:30
  • 1
    @miller9904,that wasn't stated in the question and was what you were doing in your question. I answered the question as asked. – Mark Tolonen Oct 29 '16 at 18:02
0

You could control the byte pointer yourself. In this example, I use a dictionary with byte pointers, and store it in a shelf. Then I export html files from that xml.

This script outputs the status at each 1000 pages exported, by default. It generates a graph image ('xml-split-performance.jpg' on the root of the project, by default, like the one in the end of this post), so you can be sure the process is working properly.

I'm using 8 workers by default: I store 8 pages in memory, then distribute the 8 pages to be written by the 8 workers. Not sure that's the best approach, but worked well for my 50k htmls.

from multiprocessing.pool import ThreadPool as Pool
import matplotlib.pyplot as plt
import itertools, os, sys, datetime, math
from urlparse import urljoin
import shelve
import collections
from functools import partial

class xml_split():

    # Flags
    track_line = False
    last_line = False
    output_file = False
    all_pages_extracted = False

    # Modes available: 
    # - Read output folder and calculate missing files: c (default)
    # - Start from scratch: s
    resume_modes = {
        'complete_file_count': 'c',
        'start_from_scratch': 's',
    }

    # Queue of pages in memory, before writting to file
    pages_queue = []

    def __init__(
            self,

            # Number of workers
            pool_size = 8,

            # Read x pages at a time
            page_memory = 30,

            # Path to the input xml file
            input_xml_file_path = "sample.xml",

            # File name prefix and extension
            output_file_prefix = 'page_',   
            output_file_extension = '.html',

            # Path to the output folder: will be created under the script path
            page_files_folder = 'pages',   

            # Path to the cache folder: will be created under the script path
            cache_files_folder = 'cache',  

            # filename of a graph that shows the average performance
            graph_file_path = 'xml-split-performance.jpg',

            # update graph each x number of pages extracted
            update_graph_each = 1000,

            # print status on stdout each x number of pages extracted
            show_status_each = 1000,

            # group files in folders of x pages
            batch_size = 1000,

            # tags to track
            start_string = '<page>',
            end_string = '</page>',
            start_doc = '<pages>',
            end_doc = '</pages>',

            # A little template to output the status
            log_template = """
            Page:                                  {page}
            Time:                                  {exported}
            Time since the beginning:              {queue}
            Reporting each n exports:              {show_status_each}
            Time since last report:                {process}
            Average entry export time:             {average}
            """,
        ):

        self.pool_size = pool_size
        self.pool = Pool(pool_size)

        self.input_xml_file_path = input_xml_file_path
        self.input_file = open(input_xml_file_path)

        self.output_file_prefix = output_file_prefix
        self.output_file_extension = output_file_extension
        self.page_files_folder = page_files_folder
        self.cache_files_folder = cache_files_folder

        self.page_memory = page_memory
        self.show_status_each = show_status_each
        self.update_graph_each = update_graph_each
        self.batch_size = batch_size

        self.graph_file_path = graph_file_path

        self.start_string = start_string
        self.end_string = end_string
        self.end_doc = end_doc
        self.start_doc = start_doc
        self.log_template = log_template
        self.chunk_tail = ''

        # Set project path to the current script path
        self.project_path = os.getcwd() + os.sep

        # Folder to place output files
        self.page_files_path = urljoin(self.project_path, self.page_files_folder) + os.sep

        # Folder to place cache files
        self.cache_files_path = urljoin(self.project_path, self.cache_files_folder) + os.sep

        self.create_folder(self.page_files_path)
        self.create_folder(self.cache_files_path)

        # keep track of time, to calculate time spent
        self.main_start = datetime.datetime.now()
        self.start = self.main_start

        # by default, set the resume mode to check output folder
        self.resume_mode = self.resume_modes['complete_file_count']

        # Uncomment this line to ask for user input, on the shell.
        # self.resume_mode = raw_input("s) Start from scratch c) Resume from missing files:")

        # Create or open a shelf to keep a cache of line number, page number, and performance stats
        self.chunk_pointers = shelve.open(self.cache_files_path + 'chunk_pointers.log')
        self.persistent_performance_tracker = shelve.open(self.cache_files_path + 'persistent_performance_tracker.log')


        # Init shelf counter


        # *** Resume from missing files on the output folder
        # (Resume an interrupted operation by checking the existing files on the output folders)
        if self.resume_mode == self.resume_modes['complete_file_count']:
            previously_existent_file_count = 0
            for output_root, output_dirnames, output_filenames in os.walk(self.page_files_path):
                for dirname in output_dirnames:
                    for root, dirnames, filenames in os.walk(self.page_files_path + dirname):
                        for filename in filenames:
                            if filename.endswith(self.output_file_extension) and filename.startswith(self.output_file_prefix):
                                previously_existent_file_count += 1

            resume_from_page = int(math.floor(previously_existent_file_count / self.pool_size) * self.pool_size)
            if '%s' % (resume_from_page) in self.chunk_pointers:
                self.page_count = resume_from_page
                self.byte_count = self.chunk_pointers['%s' % self.page_count]
            else:
                self.byte_count = 0
                self.page_count = 0

        # *** Do not resume
        elif resume == self.start_from_scratch:
            self.byte_count = 0
            self.page_count = 0

    # Create folder if doesn't exist
    def create_folder(self, path):
        if not os.path.exists(path):
            os.makedirs(path)

    # Get 30 pages a time and store them in memory
    def slice_file(self, start=0, end=30):
        max_pages = end - start
        chunk = self.chunk_tail
        pages_stored = 0
        while True and max_pages:
            new_chunk = self.input_file.read(10000)
            if new_chunk:

                chunk += new_chunk
                pages_stored = len(chunk.split(self.end_string))

                if pages_stored > max_pages:
                    pages_for_next_slice = max_pages - pages_stored
                    if pages_for_next_slice == 0:
                        pages_for_next_slice = -1
                    self.chunk_tail = ''.join(chunk.split(self.end_string)[pages_for_next_slice:])
                    return ''.join(chunk.split(self.end_string)[0:max_pages])

            else:
                return ''.join(chunk.split(self.end_string))


    def get_folder_name(self):
        folder_name = int(math.floor(self.page_count / self.batch_size) * self.batch_size)
        folder_name = '%s%s' % (folder_name, os.sep)
        return folder_name

    def save_page(self, path, file_contents):
        with open(path, 'w') as output_file:
            output_file.write(file_contents)

    def process_queue(self):
        for page in self.pages_queue:
            self.pool.apply_async(
                self.save_page, 
                args = (page[0], page[1])
            )

    def save_graph(self):

        performance_seconds = []
        performance_page_count = []
        vals = self.persistent_performance_tracker.items()
        ordered_vals = sorted(vals, key=lambda i: int(i[0]))

        for val in ordered_vals:
            performance_seconds += [val[1]]
            performance_page_count += [val[0]]


        plt.clf()
        plt.plot(performance_page_count, performance_seconds)
        plt.ylabel('Task duration progress')

        plt.savefig(self.graph_file_path)

    def handle_status_reports(self):

        # Update graph
        if self.page_count % self.update_graph_each == 0:

            self.end = datetime.datetime.now()
            average = (self.end - self.start) / self.show_status_each
            average = average.total_seconds()


            self.persistent_performance_tracker['%s' % self.page_count] = average
            self.persistent_performance_tracker.sync()

            self.save_graph()

        # Print status to stdout
        if self.page_count % self.show_status_each == 0:

            self.end = datetime.datetime.now()
            average = (self.end - self.start) / self.show_status_each
            average = average.total_seconds()

            log = self.log_template.format(
                    page= self.page_count,
                    exported = self.end,
                    average = average,
                    show_status_each = self.show_status_each,
                    process = self.end - self.start,
                    queue = self.end - self.main_start
                )


            self.persistent_performance_tracker['%s' % self.page_count] = average
            self.persistent_performance_tracker.sync()

            sys.stdout.write(log)
            sys.stdout.flush()
            self.start = datetime.datetime.now()



    # Go through xml file lines and output data to html files
    def read_xml(self):

        tag_contents = ''

        # Seek page where to pick up from
        self.slice_file(0, self.page_count)


        # self.slice_file(0, self.page_memory)

        while self.all_pages_extracted == False:
            # check if there are still bytes to read
            try:
                chunk = self.slice_file(0, self.page_memory)
            except:
                break

            if not chunk:
                break

            pages_in_chunk = chunk.split(self.start_string)[1:]


            for page_i, page_contents in enumerate(pages_in_chunk):

                # new page start was found, count
                self.page_count += 1

                # create batch folder
                if self.page_count % self.batch_size == 0 or self.page_count == 1:
                    self.create_folder(self.page_files_path + self.get_folder_name())


                output_file_name = '{pre}{page_count}{ext}'.format(
                    pre = self.output_file_prefix, 
                    page_count = self.page_count, 
                    ext = self.output_file_extension
                )

                # if it's the last page, set the flag and ignore closing tag
                if self.end_doc in page_contents:
                    page_contents = page_contents.split(self.end_doc)[0]
                    self.all_pages_extracted = True

                self.pages_queue += [(
                    self.page_files_path + self.get_folder_name() + output_file_name,
                    tag_contents + page_contents
                )]

                self.handle_status_reports()

                if self.page_count % self.pool_size == 0:
                    # keep track of byte pointers where worker pools start
                    self.chunk_pointers['%s' % self.page_count] = self.byte_count
                    self.chunk_pointers.sync()
                    self.process_queue()
                    self.pages_queue = []

                self.byte_count += len(page_contents)


        self.close()


    def close(self):
        self.process_queue()
        self.save_graph()
        self.chunk_pointers.close()
        self.persistent_performance_tracker.close()
        self.pool.close()
        self.pool.join()

if __name__ == '__main__':

    xso = xml_split()

    xso.read_xml()

To create the xml with 50 thousand htmls, in the format described in the question:

from loremipsum import get_sentences
from random import randint

page_template = """
  <page>
      <!DOCTYPE html>
      <html lang="en">
      <head>
          <meta charset="UTF-8">
          <title>This is article {article_id}.  Article-ly stuff goes here.</title>
      </head>
      <body>
          {page_contents}
      </body>
      </html>
  </page>
"""

with open('sample.xml','w') as f:
    f.write('<pages>\n')
    for i in range(0,50000):
        f.write(page_template.format(
            article_id = i,
            page_contents = ''.join(['<p>%s</p>\n' % s for s in get_sentences(randint(10,50))])
            )
        )
    f.write('</pages>\n')

The script generates a mathplotlib graph to keep track of the average entry extraction time, and it seems stable. This is the graph for 50.000 entries on my PC (entry export duration, in seconds, on axis y, and number of entries exported on axis x):

graph showing average time for each entry export

Feel free to cancel the process and launch it again. The default behaviour is to check existing exported files and resume from there.

Ivan Chaer
  • 6,980
  • 1
  • 38
  • 48
  • I'm not familiar with lxml and such. It appears as though this loads the whole file into memory, which wouldn't work because my file is a 57GB Wikipedia dump. Right now I'm running through the file line by line and splitting when it finds a `` tag. I made the mistake of simply writing line numbers instead of position into my index. – miller9904 Oct 26 '16 at 15:33
  • You can see my current script here: https://gist.github.com/miller9904/e51e698154fe388484c6868eec09988b – miller9904 Oct 26 '16 at 15:35
  • Sorry, I couldn't go through your code yet. I updated the answer with another approach. Let me know what you think? – Ivan Chaer Oct 26 '16 at 21:31
  • Hey, I really appreciate your effort on my question. I spent a while working on your code and I never could get it to execute correctly. I think it has something to do with my directory tree being weird. Am I correct in assuming that your program loops over each line in the file to find where it left off? I was already doing that in my script, and if the process is very far along through the file, it takes hours to loop back to where it left off. I was really hoping to find a way to determine the position in the file so I could call `file.seek()` and jump straight to where it left off. – miller9904 Oct 27 '16 at 12:02