You could control the byte pointer yourself. In this example, I use a dictionary
with byte pointers, and store it in a shelf
. Then I export html
files from that xml
.
This script outputs the status at each 1000 pages exported, by default. It generates a graph image ('xml-split-performance.jpg' on the root of the project, by default, like the one in the end of this post), so you can be sure the process is working properly.
I'm using 8 workers by default: I store 8 pages in memory, then distribute the 8 pages to be written by the 8 workers. Not sure that's the best approach, but worked well for my 50k html
s.
from multiprocessing.pool import ThreadPool as Pool
import matplotlib.pyplot as plt
import itertools, os, sys, datetime, math
from urlparse import urljoin
import shelve
import collections
from functools import partial
class xml_split():
# Flags
track_line = False
last_line = False
output_file = False
all_pages_extracted = False
# Modes available:
# - Read output folder and calculate missing files: c (default)
# - Start from scratch: s
resume_modes = {
'complete_file_count': 'c',
'start_from_scratch': 's',
}
# Queue of pages in memory, before writting to file
pages_queue = []
def __init__(
self,
# Number of workers
pool_size = 8,
# Read x pages at a time
page_memory = 30,
# Path to the input xml file
input_xml_file_path = "sample.xml",
# File name prefix and extension
output_file_prefix = 'page_',
output_file_extension = '.html',
# Path to the output folder: will be created under the script path
page_files_folder = 'pages',
# Path to the cache folder: will be created under the script path
cache_files_folder = 'cache',
# filename of a graph that shows the average performance
graph_file_path = 'xml-split-performance.jpg',
# update graph each x number of pages extracted
update_graph_each = 1000,
# print status on stdout each x number of pages extracted
show_status_each = 1000,
# group files in folders of x pages
batch_size = 1000,
# tags to track
start_string = '<page>',
end_string = '</page>',
start_doc = '<pages>',
end_doc = '</pages>',
# A little template to output the status
log_template = """
Page: {page}
Time: {exported}
Time since the beginning: {queue}
Reporting each n exports: {show_status_each}
Time since last report: {process}
Average entry export time: {average}
""",
):
self.pool_size = pool_size
self.pool = Pool(pool_size)
self.input_xml_file_path = input_xml_file_path
self.input_file = open(input_xml_file_path)
self.output_file_prefix = output_file_prefix
self.output_file_extension = output_file_extension
self.page_files_folder = page_files_folder
self.cache_files_folder = cache_files_folder
self.page_memory = page_memory
self.show_status_each = show_status_each
self.update_graph_each = update_graph_each
self.batch_size = batch_size
self.graph_file_path = graph_file_path
self.start_string = start_string
self.end_string = end_string
self.end_doc = end_doc
self.start_doc = start_doc
self.log_template = log_template
self.chunk_tail = ''
# Set project path to the current script path
self.project_path = os.getcwd() + os.sep
# Folder to place output files
self.page_files_path = urljoin(self.project_path, self.page_files_folder) + os.sep
# Folder to place cache files
self.cache_files_path = urljoin(self.project_path, self.cache_files_folder) + os.sep
self.create_folder(self.page_files_path)
self.create_folder(self.cache_files_path)
# keep track of time, to calculate time spent
self.main_start = datetime.datetime.now()
self.start = self.main_start
# by default, set the resume mode to check output folder
self.resume_mode = self.resume_modes['complete_file_count']
# Uncomment this line to ask for user input, on the shell.
# self.resume_mode = raw_input("s) Start from scratch c) Resume from missing files:")
# Create or open a shelf to keep a cache of line number, page number, and performance stats
self.chunk_pointers = shelve.open(self.cache_files_path + 'chunk_pointers.log')
self.persistent_performance_tracker = shelve.open(self.cache_files_path + 'persistent_performance_tracker.log')
# Init shelf counter
# *** Resume from missing files on the output folder
# (Resume an interrupted operation by checking the existing files on the output folders)
if self.resume_mode == self.resume_modes['complete_file_count']:
previously_existent_file_count = 0
for output_root, output_dirnames, output_filenames in os.walk(self.page_files_path):
for dirname in output_dirnames:
for root, dirnames, filenames in os.walk(self.page_files_path + dirname):
for filename in filenames:
if filename.endswith(self.output_file_extension) and filename.startswith(self.output_file_prefix):
previously_existent_file_count += 1
resume_from_page = int(math.floor(previously_existent_file_count / self.pool_size) * self.pool_size)
if '%s' % (resume_from_page) in self.chunk_pointers:
self.page_count = resume_from_page
self.byte_count = self.chunk_pointers['%s' % self.page_count]
else:
self.byte_count = 0
self.page_count = 0
# *** Do not resume
elif resume == self.start_from_scratch:
self.byte_count = 0
self.page_count = 0
# Create folder if doesn't exist
def create_folder(self, path):
if not os.path.exists(path):
os.makedirs(path)
# Get 30 pages a time and store them in memory
def slice_file(self, start=0, end=30):
max_pages = end - start
chunk = self.chunk_tail
pages_stored = 0
while True and max_pages:
new_chunk = self.input_file.read(10000)
if new_chunk:
chunk += new_chunk
pages_stored = len(chunk.split(self.end_string))
if pages_stored > max_pages:
pages_for_next_slice = max_pages - pages_stored
if pages_for_next_slice == 0:
pages_for_next_slice = -1
self.chunk_tail = ''.join(chunk.split(self.end_string)[pages_for_next_slice:])
return ''.join(chunk.split(self.end_string)[0:max_pages])
else:
return ''.join(chunk.split(self.end_string))
def get_folder_name(self):
folder_name = int(math.floor(self.page_count / self.batch_size) * self.batch_size)
folder_name = '%s%s' % (folder_name, os.sep)
return folder_name
def save_page(self, path, file_contents):
with open(path, 'w') as output_file:
output_file.write(file_contents)
def process_queue(self):
for page in self.pages_queue:
self.pool.apply_async(
self.save_page,
args = (page[0], page[1])
)
def save_graph(self):
performance_seconds = []
performance_page_count = []
vals = self.persistent_performance_tracker.items()
ordered_vals = sorted(vals, key=lambda i: int(i[0]))
for val in ordered_vals:
performance_seconds += [val[1]]
performance_page_count += [val[0]]
plt.clf()
plt.plot(performance_page_count, performance_seconds)
plt.ylabel('Task duration progress')
plt.savefig(self.graph_file_path)
def handle_status_reports(self):
# Update graph
if self.page_count % self.update_graph_each == 0:
self.end = datetime.datetime.now()
average = (self.end - self.start) / self.show_status_each
average = average.total_seconds()
self.persistent_performance_tracker['%s' % self.page_count] = average
self.persistent_performance_tracker.sync()
self.save_graph()
# Print status to stdout
if self.page_count % self.show_status_each == 0:
self.end = datetime.datetime.now()
average = (self.end - self.start) / self.show_status_each
average = average.total_seconds()
log = self.log_template.format(
page= self.page_count,
exported = self.end,
average = average,
show_status_each = self.show_status_each,
process = self.end - self.start,
queue = self.end - self.main_start
)
self.persistent_performance_tracker['%s' % self.page_count] = average
self.persistent_performance_tracker.sync()
sys.stdout.write(log)
sys.stdout.flush()
self.start = datetime.datetime.now()
# Go through xml file lines and output data to html files
def read_xml(self):
tag_contents = ''
# Seek page where to pick up from
self.slice_file(0, self.page_count)
# self.slice_file(0, self.page_memory)
while self.all_pages_extracted == False:
# check if there are still bytes to read
try:
chunk = self.slice_file(0, self.page_memory)
except:
break
if not chunk:
break
pages_in_chunk = chunk.split(self.start_string)[1:]
for page_i, page_contents in enumerate(pages_in_chunk):
# new page start was found, count
self.page_count += 1
# create batch folder
if self.page_count % self.batch_size == 0 or self.page_count == 1:
self.create_folder(self.page_files_path + self.get_folder_name())
output_file_name = '{pre}{page_count}{ext}'.format(
pre = self.output_file_prefix,
page_count = self.page_count,
ext = self.output_file_extension
)
# if it's the last page, set the flag and ignore closing tag
if self.end_doc in page_contents:
page_contents = page_contents.split(self.end_doc)[0]
self.all_pages_extracted = True
self.pages_queue += [(
self.page_files_path + self.get_folder_name() + output_file_name,
tag_contents + page_contents
)]
self.handle_status_reports()
if self.page_count % self.pool_size == 0:
# keep track of byte pointers where worker pools start
self.chunk_pointers['%s' % self.page_count] = self.byte_count
self.chunk_pointers.sync()
self.process_queue()
self.pages_queue = []
self.byte_count += len(page_contents)
self.close()
def close(self):
self.process_queue()
self.save_graph()
self.chunk_pointers.close()
self.persistent_performance_tracker.close()
self.pool.close()
self.pool.join()
if __name__ == '__main__':
xso = xml_split()
xso.read_xml()
To create the xml
with 50 thousand html
s, in the format described in the question:
from loremipsum import get_sentences
from random import randint
page_template = """
<page>
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>This is article {article_id}. Article-ly stuff goes here.</title>
</head>
<body>
{page_contents}
</body>
</html>
</page>
"""
with open('sample.xml','w') as f:
f.write('<pages>\n')
for i in range(0,50000):
f.write(page_template.format(
article_id = i,
page_contents = ''.join(['<p>%s</p>\n' % s for s in get_sentences(randint(10,50))])
)
)
f.write('</pages>\n')
The script generates a mathplotlib graph to keep track of the average entry extraction time, and it seems stable. This is the graph for 50.000 entries on my PC (entry export duration, in seconds, on axis y, and number of entries exported on axis x):

Feel free to cancel the process and launch it again. The default behaviour is to check existing exported files and resume from there.