8

I'm working on a project to parse multiple xml files concurrently in python using lxml. When I initialize the process I want my main class to do some work on the XML before it passes the etree object to the process, but I am finding that when the etree object arrives in the new process the class survives but the XML is gone from within the object and getroot() returns None.

I know that I can only pass pickable data using the queue, but is this also the case with what I pass to the process inside the 'args' field?

Here's my code:

import multiprocessing, multiprocessing.pool, time
from lxml import etree

def compute(tree):
    print("Start Process")
    print(type(tree)) # Returns <class 'lxml.etree._ElementTree'>
    print(id(tree)) # Returns new ID 44637320 as expected
    print(tree.getroot()) # Returns None

def pool_init(queue):
    # see http://stackoverflow.com/a/3843313/852994
    compute.queue = queue

class Main():
    def __init__(self):
        pass

    def main(self):
        tree = etree.parse('test.xml')
        print(id(tree)) # Returns object ID 43998536
        print(tree.getroot()) #Returns <Element SymCLI_ML at 0x29f5dc8>

        self.queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(processes=1, initializer=pool_init, initargs=(self.queue,))
        self.pool.apply_async(func=compute, args=(tree,))
        time.sleep(10)

if __name__ == '__main__':
    Main().main()

Any and all help much appreciated.

UPDATE/ANSWER

Based on the answer in the next post down I've modified it a bit and managed to get it working with a much lower memory footprint without using String IO. The etree.tostring method returns a byte array, which can be pickled, then to unpickle it the byte array can be parsed by etree.

import multiprocessing, multiprocessing.pool, time, copyreg
from lxml import etree

def compute(tree):
    print("Start Process")
    print(type(tree)) # Returns <class 'lxml.etree._ElementTree'>
    print(tree.getroot()) # Returns <Element SymCLI_ML at 0x29f5dc8>. Success!

def pool_init(queue):
    # see http://stackoverflow.com/a/3843313/852994
    compute.queue = queue

def elementtree_unpickler(data):
    return etree.parse(BytesIO(data))

def elementtree_pickler(tree):
    return elementtree_unpickler, (etree.tostring(tree),)

copyreg.pickle(etree._ElementTree, elementtree_pickler, elementtree_unpickler)

class Main():
    def __init__(self):
        pass

    def main(self):
        tree = etree.parse('test.xml')
        print(tree.getroot()) #Returns <Element SymCLI_ML at 0x29f5dc8>

        self.queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(processes=1, initializer=pool_init, initargs=(self.queue,))
        self.pool.apply_async(func=compute, args=(tree,))
        time.sleep(10)

if __name__ == '__main__':
    Main().main()

UPDATE 2

After doing some bench-marking with memory I found that passing large objects causes the objects to not be able to be cleared up by garbage collection on the main process. This probably isn't an issue at small scale, but by etree objects were in the order of multiple hundreds of MB in memory. As soon as an async task has been called with an XML object in the statement, that object cannot be cleared from memory if it is deleted from the main process, even my manually invoking garbage collection. So as a result I've reverted to closing the XML in the main process and passing the file name to the sub-process.

proudmatt
  • 197
  • 3
  • 8
  • Would it be possible to put the etree objects in shared memory and pass references to the shared memory location to sub-processes? – Andreas Yankopolus Jun 14 '17 at 13:16
  • how did this turn out? Did Andreas suggestion help? – Dan Sep 03 '22 at 01:02
  • Dan, unfortunately I no longer have access to the codebase to be able to test. With the correct code I can't see why using shared memory in python 3.8+ wouldn't work and be garbage collected. – proudmatt Sep 04 '22 at 06:08

1 Answers1

13

Use the following code to register simple picklers/unpicklers for lxml Element/ElementTree objects. I used that in the past with lxml and zmq.

import copy_reg
try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO
from lxml import etree

def element_unpickler(data):
    return etree.fromstring(data)

def element_pickler(element):
    data = etree.tostring(element)
    return element_unpickler, (data,)

copy_reg.pickle(etree._Element, element_pickler, element_unpickler)

def elementtree_unpickler(data):
    data = StringIO(data)
    return etree.parse(data)

def elementtree_pickler(tree):
    data = StringIO()
    tree.write(data)
    return elementtree_unpickler, (data.getvalue(),)

copy_reg.pickle(etree._ElementTree, elementtree_pickler, elementtree_unpickler)
Georges Martin
  • 1,158
  • 1
  • 8
  • 16
  • 2
    I've added this (python 3.4, so copy_reg is copyreg and StringIO import is 'from is import StringIO'), but on the line which starts the process I get 'I/O error : write error'. Full code amended to the initial question. – proudmatt Sep 23 '14 at 13:23