3

I am using Motor for async MongoDB operations. I have a gridfs storage where I store large XML files (typically 30+ MB in size) in chunks of 8 MBs. I want to incrementally parse the XML file using xmltodict. Here is how my code looks.

async def read_file(file_id):
    gfs_out: AsyncIOMotorGridOut = await gfs_bucket.open_download_stream(file_id)

    tmpfile = tempfile.SpooledTemporaryFile(mode="w+b")
    while data := await gfs_out.readchunk():
        tmpfile.write(data)

    xmltodict.parse(tmpfile)

I am pulling all the chunks out one by one and storing them in a temporary file in memory and then parsing the entire file through xmltodict. Ideally I would want toparse it incrementally as I don't need the entire xml object from the get go.

The documentation for xmltodict suggests that we can add custom handlers to parse a stream, like this example:

>>> def handle_artist(_, artist):
...     print(artist['name'])
...     return True
>>> 
>>> xmltodict.parse(GzipFile('discogs_artists.xml.gz'),
...     item_depth=2, item_callback=handle_artist)
A Perfect Circle
Fantômas
King Crimson
Chris Potter
...

But the problem with this is that it expects a file-like object with a synchronous read() method, not a coroutine. Is there any way it can be achieved? Any help would be greatly appreciated.

Shiladitya Bose
  • 893
  • 2
  • 13
  • 31

1 Answers1

0

No, not directly. There is no async parser in xmltodict, but you can use the rest of the library to create own async parser. May be even contribute back to the community if you find it useful to yourself and can add all due diligence with parameters to make it compatible with original parse.

A very oversimplified code would be something like that:

from xmltodict import _DictSAXHandler

# from https://github.com/martinblech/xmltodict/blob/8c8cdd2f17520e0422dbe8cda6156e98375e02d3/xmltodict.py:
try:
    from defusedexpat import pyexpat as expat
except ImportError:
    from xml.parsers import expat

async def async_parser(async_generator):
    parser = expat.ParserCreate('utf-8')
    handler = _DictSAXHandler(namespace_separator=":")
    parser.StartNamespaceDeclHandler = handler.startNamespaceDecl
    parser.StartElementHandler = handler.startElement
    parser.EndElementHandler = handler.endElement
    parser.CharacterDataHandler = handler.characters

    # the change to the original parser to make it async
    async for chunk in async_generator:
        parser.Parse(chunk, False)
    parser.Parse(b'', True)
    return handler.item

# then you create the async generator from GFS
async def read_file(file_id):
    gfs_out: AsyncIOMotorGridOut = await gfs_bucket.open_download_stream(file_id)

    while data := await gfs_out.readchunk():
        yield data
    
# and pass it as a parameter to the parser
async_parser(read_file("the id"))

Keep in mind, the async file reader makes the parser asynchronous too, so you will need to await the results.

Alex Blex
  • 34,704
  • 7
  • 48
  • 75