4

I'm trying to consume the Exchange GetAttachment webservice using requests, lxml and base64io. This service returns a base64-encoded file in a SOAP XML HTTP response. The file content is contained in a single line in a single XML element. GetAttachment is just an example, but the problem is more general.

I would like to stream the decoded file contents directly to disk without storing the entire contents of the attachment in-memory at any point, since an attachment could be several 100 MB.

I have tried something like this:

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
    for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
    if elem.tag == 't:Content':
        b64_encoder = Base64IO(BytesIO(elem.text))
        f.write(b64_encoder.read())

but lxml still stores a copy of the attachment as elem.text. Is there any way I can create a fully streaming XML parser that also streams the content of an element directly from the input stream?

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Erik Cederstrand
  • 9,643
  • 8
  • 39
  • 63

1 Answers1

8

Don't use iterparse in this case. The iterparse() method can only issue element start and end events, so any text in an element is given to you when the closing XML tag has been found.

Instead, use a SAX parser interface. This is a general standard for XML parsing libraries, to pass on parsed data to a content handler. The ContentHandler.characters() callback is passed character data in chunks (assuming that the implementing XML library actually makes use of this possibility). This is a lower level API from the ElementTree API, and and the Python standard library already bundles the Expat parser to drive it.

So the flow then becomes:

  • wrap the incoming request stream in a GzipFile for easy decompression. Or, better still, set response.raw.decode_content = True and leave decompression to the requests library based on the content-encoding the server has set.
  • Pass the GzipFile instance or raw stream to the .parse() method of a parser created with xml.sax.make_parser(). The parser then proceeds to read from the stream in chunks. By using make_parser() you first can enable features such as namespace handling (which ensures your code doesn't break if Exchange decides to alter the short prefixes used for each namespace).
  • The content handler characters() method is called with chunks of XML data; check for the correct element start event, so you know when to expect base64 data. You can decode that base64 data in chunks of (a multiple of) 4 characters at a time, and write it to a file. I'd not use base64io here, just do your own chunking.

A simple content handler could be:

from xml.sax import handler
from base64 import b64decode

class AttachmentContentHandler(handler.ContentHandler):
    types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'

    def __init__(self, filename):
        self.filename = filename

    def startDocument(self):
        self._buffer = None
        self._file = None

    def startElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # we can expect base64 data next
            self._file = open(self.filename, 'wb')
            self._buffer = []

    def endElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # all attachment data received, close the file
            try:
                if self._buffer:
                    raise ValueError("Incomplete Base64 data")
            finally:
                self._file.close()
                self._file = self._buffer = None

    def characters(self, data):
        if self._buffer is None:
            return
        self._buffer.append(data)
        self._decode_buffer()

    def _decode_buffer(self):
        remainder = ''
        for data in self._buffer:
            available = len(remainder) + len(data)
            overflow = available % 4
            if remainder:
                data = (remainder + data)
                remainder = ''
            if overflow:
                remainder, data = data[-overflow:], data[:-overflow]
            if data:
                self._file.write(b64decode(data))
        self._buffer = [remainder] if remainder else []

and you'd use it like this:

import requests
from xml.sax import make_parser, handler

parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True  # if content-encoding is used, decompress as we read
parser.parse(r.raw)

This will parse the input XML in chunks of up to 64KB (the default IncrementalParser buffer size), so attachment data is decoded in at most 48KB blocks of raw data.

I'd probably extend the content handler to take a target directory and then look for <t:Name> elements to extract the filename, then use that to extract the data to the correct filename for each attachment found. You'd also want to verify that you are actually dealing with a GetAttachmentResponse document, and handle error responses.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • 1
    Thanks for the elegant solution and pointer to the SAX characters() method! I actually managed to solve it using iterparse(), by wrapping the input stream in a custom, buffering input stream, reacting to the start event and then intercepting my input stream, checking the buffer to find the start token and then consuming the stream until I find the end token, and finally positioning the stream at the start of the end token. The resulting code was really messy. Your solution is much cleaner and easier to understand. – Erik Cederstrand Oct 30 '18 at 10:28
  • @ErikCederstrand: the SAX event-driven interface can lead to somewhat harder to read code, as you now need to maintain a sort of state machine to track what elements have been seen, etc. If expanding on the class to handle multiple attachments, etc. gets out of hand, consider creating separate classes for each state and delegating to a `self.state` attribute, that should *keep* this cleaner and easier to understand. – Martijn Pieters Oct 30 '18 at 10:32
  • @Erik Cedarstrand, would you mind posting your iterparse solution? – atc_ceedee Apr 18 '22 at 14:51
  • Sure! As stated above, iterparse was not the right solution, though. This is my handler: https://github.com/ecederstrand/exchangelib/blob/1a7ae76fcd2a348ab1f028545ac271f0056ca10c/exchangelib/util.py#L295 and this is my parser: https://github.com/ecederstrand/exchangelib/blob/1a7ae76fcd2a348ab1f028545ac271f0056ca10c/exchangelib/util.py#L345 GitHub should point you to the usages. – Erik Cederstrand Apr 18 '22 at 18:18