15

I am able to generate and stream text on the fly, but unable to generate and stream a compressed file on the fly.

from flask import Flask, request, Response,stream_with_context
import zlib
import gzip

app = Flask(__name__)

def generate_text():
    for x in range(10000):
        yield f"this is my line: {x}\n".encode()

@app.route('/stream_text')
def stream_text():
    response = Response(stream_with_context(generate_text()))
    return response

def generate_zip():
    for x in range(10000):
        yield zlib.compress(f"this is my line: {x}\n".encode())

@app.route('/stream_zip')
def stream_zip():
    response = Response(stream_with_context(generate_zip()), mimetype='application/zip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

Than using curl and gunzip:

curl http://127.0.0.1:8000/stream_zip > data.gz

gunzip data.gz
gunzip: data.gz: not in gzip format

I don't care if it is zip, gzip, or any other type of compression.

generate_text in my real code generates over 4 GB of data so I would like to compress on the fly.

Saving text to file, zipping, returning zip file, and than deleting is not the solution I'm after.

I need to be in a loop generating some text -> compress that text -> streaming compressed data until I'm done.

zip/gzip ... anything is fine as long as it works.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
mattjvincent
  • 868
  • 2
  • 11
  • 28

4 Answers4

21

You are yielding a series of compressed documents, not a single compressed stream. Don't use zlib.compress(), it includes the header and forms a single document.

You need to create a zlib.compressobj() object instead, and use the Compress.compress() method on that object to produce a stream of data (followed by a final call to Compress.flush()):

def generate_zip():
    compressor = zlib.compressobj()
    for x in range(10000):
        chunk = compressor.compress(f"this is my line: {x}\n".encode())
        if chunk:
            yield chunk
    yield compressor.flush()

The compressor can produce empty blocks when there is not enough data yet to produce a full compressed-data chunk, the above only yields if there is actually anything to send. Because your input data is so highly repetitive and thus the data can be efficiently compressed, this yields only 3 times (once with 2-byte header, once with about 21kb of compressed data covering the first 8288 iterations over range(), and finally with the remaining 4kb for the rest of the loop).

In aggregate, this produces the same data as a single zlib.compress() call with all inputs concatenated. The correct mime-type for this data format is application/zlib, not application/zip.

This format is not readily decompressible with gzip however, not without some trickery. That's because the above doesn't yet produce a GZIP file, it just produces a raw zlib-compressed stream. To make it GZIP compatible, you need to configure the compression correctly, send a header first, and add a CRC checksum and data length value at the end:

import zlib
import struct
import time

def generate_gzip():
    # Yield a gzip file header first.
    yield bytes([
        0x1F, 0x8B, 0x08, 0x00,  # Gzip file, deflate, no filename
        *struct.pack('<L', int(time.time())),  # compression start time
        0x02, 0xFF,  # maximum compression, no OS specified
    ])

    # bookkeeping: the compression state, running CRC and total length
    compressor = zlib.compressobj(
        9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    crc = zlib.crc32(b"")
    length = 0

    for x in range(10000):
        data = f"this is my line: {x}\n".encode()
        chunk = compressor.compress(data)
        if chunk:
            yield chunk
        crc = zlib.crc32(data, crc) & 0xFFFFFFFF
        length += len(data)

    # Finishing off, send remainder of the compressed data, and CRC and length
    yield compressor.flush()
    yield struct.pack("<2L", crc, length & 0xFFFFFFFF)

Serve this as application/gzip:

@app.route('/stream_gzip')
def stream_gzip():
    response = Response(stream_with_context(generate_gzip()), mimetype='application/gzip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

and the result can be decompressed on the fly:

curl http://127.0.0.1:8000/stream_gzip | gunzip -c | less
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
1

While I was extremely impressed by Martijn's solution, I decided to roll my own one that uses pigz for better performance:

def yield_pigz(results, compresslevel=1):
    cmd = ['pigz', '-%d' % compresslevel]
    pigz_proc = subprocess.Popen(cmd, bufsize=0,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def f():
        for result in results:
            pigz_proc.stdin.write(result)
            pigz_proc.stdin.flush()
        pigz_proc.stdin.close()
    try:
        t = threading.Thread(target=f)
        t.start()
        while True:
            buf = pigz_proc.stdout.read(4096)
            if len(buf) == 0:
                break
            yield buf
    finally:
        t.join()
        pigz_proc.wait()

Keep in mind that you'll need to import subprocess and threading for this to work. You will also need to install pigz program (already in repositories of most Linux distributions -- on Ubuntu, just use sudo apt install pigz -y).

Example usage:

from flask import Flask, Response
import subprocess
import threading
import random

app = Flask(__name__)

def yield_something_random():
    for i in range(10000):
        seq = [chr(random.randint(ord('A'), ord('Z'))) for c in range(1000)]
        yield ''.join(seq)

@app.route('/')
def index():
    return Response(yield_pigz(yield_something_random()))
d33tah
  • 10,999
  • 13
  • 68
  • 158
  • This is pointless, I'm afraid, there is no benefit to be had here. `pigz` works by divying up the input into separate blocks that can be compressed individually (into 128 KB chunks), but you are feeding it a *stream of data*, and doing so over a pipe. You are not going to be writing data fast enough for pigz to find more than a few blocks, *if that*, while adding the overhead of spinning up a separate process and passing data back and forth over pipes. You are spinning up a separate process for each request as well, which is a real drag on scalability. – Martijn Pieters Jan 23 '20 at 14:25
  • Pigz is great when you have a local file you can read a whole series of chunks from from different blocks of the disk, and pass to each compression thread. – Martijn Pieters Jan 23 '20 at 14:27
-2

I think that currently you just sending the generator instead of the data! You may want to do something like this (I haven't tested it, so may need some change):

def generate_zip():
    import io
    with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as gfile:
        for x in xrange(10000):
             gfile.write("this is my line: {}\n".format(x))
    return gfile.read()
Dror Hilman
  • 6,837
  • 9
  • 39
  • 56
  • That's the *whole point* of the generator; it is iterated over elsewhere. The issue is with using `zlib.compress()`. Your solution removes the ability to stream altogether. – Martijn Pieters Jun 06 '17 at 10:24
-2

Working generate_zip() with low memory consumption :) :

def generate_zip():
    buff = io.BytesIO()
    gz = gzip.GzipFile(mode='w', fileobj=buff)
    for x in xrange(10000):
        gz.write("this is my line: {}\n".format(x))
        yield buff.read()
        buff.truncate()
    gz.close()
    yield buff.getvalue()
Bartek Jablonski
  • 2,649
  • 24
  • 32
  • This doesn't actually return any data until the last yield. – mattjvincent Jun 05 '17 at 18:09
  • This yields 10k empty strings, then the whole document. This is also overkill. There is no need to keep separate file objects in memory, just stream the compression directly. If you do want to use a `GzipFile()` object, why not just capture the `write()` calls directly rather than use a `BytesIO()` object? – Martijn Pieters Jun 06 '17 at 10:30
  • Your approach could only be fixed if you used `buff.getvalue()` in the loop (not `buff.read()`), used `buff.truncate(0)` (you are truncating to the current position, not to the start). To avoid yielding empty blocks, you may want to test if `buff.getvalue()` actually produced anything, and only yield and truncate if it did. – Martijn Pieters Jun 06 '17 at 11:38