0

I have a python script that "streams" a very large gzip file using urllib3 and feeds it into a zlib.decompressobj. This zlib decompression object is configured to read gzip compression. If this initial http connection is interrupted then the zlib.decompressobj begins to throw errors after the connection is "resumed". See my source code below if you want to cut to the chase.

These errors occur despite the fact that the script initiates a new http connection using the Range header to specify the number of bytes previously read. It resumes from the completed read point present when the connection was broken. I believe this arbitrary resume point is the source of my problem.

If I don't try to decompress the chunks of data being read in by urllib3, but instead just write them to a file, everything works just fine. Without trying to decompress the stream everything works even when there is an interruption. The completed archive is valid, it is the same size as one downloaded by a browser and the MD5 hash of the .gz file is the same as if I had downloaded the file directly with Chrome.

On the other hand, if I try to decompress the chunks of data coming in after the interruption, even with the Range header specified, the zlib library throws all kinds of errors. The most recent was Error -3 while decompressing data: invalid block type

Additional Notes:

  • The site that I am using has the Accept-Range flag set to bytes meaning that I am able to submit modified Range headers to the server.
  • I am not using the requests library in this script as it ultimately manages urllib3. I am instead using urllib3 directly in an attempt to cut out the middle man.
  • This script it an oversimplification of my ultimate goal, which is to stream the compressed data directly from where it is hosted, enrich it and store it in a MySQL database on the local network.
  • The most common problem I am encountering with the urllib3 (and requests) library is the IncompleteRead(self._fp_bytes_read, self.length_remaining) error.
    • This error only appears if the urllib3 library has been patched to raise an exception when an incomplete read occurs.

My best guess:

I am guessing that the break in the data stream being fed to zlib.decompressobj is causing zlib to somehow lose context and start attempting to decompress the data again in an odd location. Sometimes it will resume, however the data stream is garbled, making me believe the byte location used as the new Range header fell at the front of some bytes which are then incorrectly interpreted as headers. I do not know how to counteract this and I have been trying to solve it for several weeks. The fact that the data are still valid when downloaded whole (without being decompressed prior to completion) even with an interruption occurs, makes me believe that some "loss of context" within zlib is the cause.

Source Code: (Has been updated to include a "buffer")

This code is a little bit slapped together so forgive me. Also, this target gzip file is quite a lot smaller than the actual file I will be using. Additionally, the target file in this example will no longer be available from Rapid7 in about a month's time. You may choose to substitute a different .gz file if that suits you.

import urllib3
import certifi
import inspect
import os
import time
import zlib

def patch_urllib3():
    """Set urllib3's enforce_content_length to True by default."""
    previous_init = urllib3.HTTPResponse.__init__
    def new_init(self, *args, **kwargs):
        previous_init(self, *args, enforce_content_length = True, **kwargs)
    urllib3.HTTPResponse.__init__ = new_init

#Path the urllib3 module to throw an exception for IncompleteRead
patch_urllib3()

#Set the target URL
url = "https://opendata.rapid7.com/sonar.http/2021-11-27-1638020044-http_get_8899.json.gz"

#Set the local filename
local_filename = '2021-11-27-1638020044-http_get_8899_script.json.gz'

#Configure the PoolManager to handle https (I think...)
http = urllib3.PoolManager(ca_certs=certifi.where())

#Initiate start bytes at 0 then update as download occurs
sum_bytes_read=0
session_bytes_read=0
total_bytes_read=0
#Dummy variable to silence console output from file write
writer=0

#Set zlib window bits to 16 bits for gzip decompression
decompressor = zlib.decompressobj(zlib.MAX_WBITS|16)

#Build a buffer list
buf_list=[]
i=0
while True:
    print("Building request. Bytes read:",total_bytes_read)
    resp = http.request(
        'GET',
        url,
        timeout=urllib3.Timeout(connect=15, read=40),
        preload_content=False)
    print("Setting headers.")
    #This header should cause the request to resume at "total_bytes_read"
    resp.headers['Range'] = 'bytes=%s' % (total_bytes_read)
    print("Local filename:",local_filename)
    #If file already exists then append to it
    if os.path.exists(local_filename):
        print("File already exists.")
        try:
            print("Starting appended download.")
            with open(local_filename, 'ab') as f:
                for chunk in resp.stream(2048):
                    buf_list.append(chunk)
                    #Use i to offset the chunk being read from the "buffer"
                    #I.E. load 3 chunks (0,1,2) in the buffer list before starting to read from it
                    if i >2: 
                        buffered_chunk=buf_list.pop(0)
                        writer=f.write(buffered_chunk)
                        #Comment out the below line to stop the error from occurring.
                        #File download should complete successfully even if interrupted when the following line is commented out.
                        decompressed_chunk=decompressor.decompress(buffered_chunk)
                    #Increment i so that the buffer list will fill before reading from it
                    i=i+1
                    session_bytes_read = resp._fp_bytes_read
                    #Sum bytes read is an updated value that isn't stored. It is only used for console print 
                    sum_bytes_read = total_bytes_read + session_bytes_read
                    print("[+] Bytes read:",str(format(sum_bytes_read, ",")), end='\r')
            print("\nAppended download complete.")
            break
        except Exception as e:
            print(e)
            #Add to total bytes read to current session bytes each time the loop needs to repeat
            total_bytes_read=total_bytes_read+session_bytes_read
            print("Bytes Read:",total_bytes_read)
            #Mod the total_bytes back to the nearest chunk size so it can be - re-requested
            total_bytes_read=total_bytes_read-(total_bytes_read%2048)-2048
            print("Rounded bytes Read:",total_bytes_read)
            #Pop the last entry off of the buffer since it may be incomplete
            buf_list.pop()
            #reset i so that the buffer has to rebuilt
            i=0
            print("Sleeping for 30 seconds before re-attempt...")
            time.sleep(30)
    #If file doesn't already exist then write to it directly
    else:
        print("File does not exist.")
        try:
            print("Starting initial download.")
            with open(local_filename, 'wb') as f:
                for chunk in resp.stream(2048):
                    buf_list.append(chunk)
                    #Use i to offset the chunk being read from the "buffer"
                    #I.E. load 3 chunks (0,1,2) in the buffer list before starting to read from it
                    if i > 2: 
                        buffered_chunk=buf_list.pop(0)
                        #print("Buffered Chunk",str(i-2),"-",buffered_chunk)
                        writer=f.write(buffered_chunk)
                        decompressed_chunk=decompressor.decompress(buffered_chunk)
                    #Increment i so that the buffer list will fill before reading from it
                    i=i+1
                    session_bytes_read = resp._fp_bytes_read
                    print("[+] Bytes read:",str(format(session_bytes_read, ",")), end='\r')
            print("\nInitial download complete.")
            break
        except Exception as e:
            print(e)
            #Set the total bytes read equal to the session bytes since this is the first failure
            total_bytes_read=session_bytes_read
            print("Bytes Read:",total_bytes_read)
            #Mod the total_bytes back to the nearest chunk size so it can be - re-requested
            total_bytes_read=total_bytes_read-(total_bytes_read%2048)-2048
            print("Rounded bytes Read:",total_bytes_read)
            #Pop the last entry off of the buffer since it may be incomplete
            buf_list.pop()
            #reset i so that the buffer has to rebuilt
            i=0
            print("Sleeping for 30 seconds before re-attempt...")
            time.sleep(30)
    print("Looping...")
#Finish writing from buffer into file
#BE SURE TO SET TO "APPEND" with "ab" or you will overwrite the start of the file
f = open(local_filename, 'ab')
print("[+] Finishing write from buffer.")
while not len(buf_list) == 0:
    buffered_chunk=buf_list.pop(0)
    writer=f.write(buffered_chunk)
    decompressed_chunk=decompressor.decompress(buffered_chunk)
#Flush and close the file
f.flush()
f.close()
resp.release_conn()

Reproducing the error

  • To reproduce the error perform the following actions:
    • Run the script and let the download start
    • Be sure that line 65 decompressed_chunk=decompressor.decompress(chunk) is not commented out
    • Turn off your network connection until an exception is raised
    • Turn your network connection back on immediately.

If the decompressor.decompress(chunk) line is removed from the script then it will download the file and the data can be successfully decompressed from the file itself. However, if line 65 is present and an interruption occurs, the zlib library will not be able to continue decompressing the data stream. I need to decompress the data stream as I cannot store the actual file I am trying to use.

Is there some way to prevent this from occurring? I have now attempted to add a "buffer" list that stores the chunks; the script discards the last chunk after a failure and moves back to a point in the file that preceded the "failed" chunk. I am able to re-establish the connection and even pull back all the data correctly but even with a "buffer" my ability to decompress the stream is interrupted. I must not be smoothly recovering the data back to the buffer somehow.

Visualization:

I put this together very quickly in an attempt to better describe what I am trying to do...

Streaming Decompress Visualization I bet Mark Adler is hiding out there somewhere...

Shrout1
  • 2,497
  • 4
  • 42
  • 65

1 Answers1

1

r+b doesn't append. You would need to use ab for that. It appears that on the re-try, you are reading the entire gzip file again from the start. With r+b, that file is written correctly to your output file, by overwriting what was read before.

However, you are feeding the initial read to the decompressor, and then the start of the file again. Not surprisingly, the decompressor then soon detects invalid compressed data.

Mark Adler
  • 101,978
  • 13
  • 118
  • 158
  • Thank you! I actually had caught the `r+b` thing and fixed it. I have updated it in my code. I've also added a "buffer" in the form a list to store the binary data being retrieved in a queue... When you say I am reading the "start of the file again" what do you mean? I am attempting to read the stream (or file) from the point that it was interrupted by providing the `Range` header in the request using the `bytes_read` value. I believe that this point is arbitrary and does not contain some kind of header that `zlib` needs to see but that may be wrong. Is there an example of how to recover here? – Shrout1 Dec 06 '21 at 21:50
  • With `r+b`, your code was rewriting the .gz file from the beginning on every retry. Since the resulting file was always correct and complete, you can conclude that each retry is redownloading from the beginning. Your `bytes_read` thing isn't working. – Mark Adler Dec 06 '21 at 21:59
  • The decompressor on the other hand is _not_ starting over each time, so it's getting some partial initial portion of the .gz file, then again a partial initial portion, until the final retry when it gets the whole thing. So if the file is `abc`, then from the final retry `abc` is written to disk (due to `r+b`). However the decompressor is getting something like `abaabc`. So it is rightly complaining about invalid data. There is nothing wrong with the decompressor that needs to be recovered from. You simply need to provide it a valid data stream. – Mark Adler Dec 06 '21 at 22:00
  • Yeah you're right. It is *not* working with the `bytes_read` option even though I thought it was. I need to look more closely at the data that is getting sent back to me from the server. I cannot tell you how much I appreciate your time; you've helped me before! I dream of having a signed copy of the zlib RFC in my office some day :) I hope you have a happy holiday – Shrout1 Dec 06 '21 at 22:55