0

I'm trying to create an endpoint that allows me to download a large file using a stream. But I want to get chunks of the file with a size specified as (CHUNK_SIZE = 1024 * 1024 * 20). Furthermore, I only want the binary data of the file in my chunks.

Example:

async for chunk in request.stream():
    print(chunk)

current result:

b'--8af8cd2412f5572588d978d2a7c51016\r\nContent-Disposition: form-data; name="file"; filename="toto.txt"\r\nContent-Type: text/plain\r\n\r\n'
b'this is my file data! (Can be huge data)\r\n--8af8cd2412f5572588d978d2a7c51016--\r\n'

Result that i want (only the binary data of my file):

b'this is my file data! (Can be huge data)'
# or for large file
b'...' * 999

If the community can help me, that would be very kind.

I tried several things like:

  • streaming_form_data: but i don't need to create the file on my hard drive, i saw i could use stuff with TargetValue but it didn't work (for me).
  • I even tried to make my own form data parser, but it doesn't work at all.
from fastapi import Request

@app.post("/uploads/stream")
async def upload_stream(request: Request):
    filename = request.headers.get('file')
    boundary = request.headers.get('content-type').split("boundary=")[1].encode("utf-8")
    content_type = request.headers.get('content-type').encode("utf-8")

    ender = b"\r\n--" + boundary + b"--\r\n"
    starter = b"--" + boundary + b"Content-Disposition: form-data; name=\"file\"; filename=\"" + filename.encode("utf-8") + b"\"\r\nContent-Type: " + content_type + b"\r\n\r\n"
    is_file = False

    async for chunk in request.stream():
        if len(chunk) == 0:
            continue

        if starter in chunk:
            print("starter found")
            print(chunk.split(starter)[1])
            is_file = True

        if ender in chunk:
            print("ender found")
            print(chunk.split(ender)[0])
            is_file = False

        if is_file:
            pass # todo: do stuff with the chunk here


    return {"file": filename}

RESOLVE:

For this solution, I use python multipart: pip install python-multipart

I've optimized the callback with thread pool and collector and a CHUNK_SIZE, but feel free to update my code, or let me know if I've made any mistakes.

from concurrent.futures import ThreadPoolExecutor
from multipart import MultipartParser
from fastapi import Request
from fastapi.responses import JSONResponse


CHUNK_SIZE = 1024 * 1024 * 20


class UploadFileByStream:
    def __init__(self):
        self.buffer = None
        self.bytes_received = None
        self.executor = ThreadPoolExecutor()
        self.results = []
        self.futures = []

    def on_part_begin(self):
        self.buffer = b""
        self.bytes_received = 0

    def on_part_data(self, data, start, end):
        self.buffer += data[start:end]
        self.bytes_received += end - start

        if self.bytes_received >= CHUNK_SIZE:
            self.send_buffer()

    def on_part_end(self):
        if self.buffer:
            self.send_buffer()

    def send_buffer(self):
        future = self.executor.submit(YOUR_FUNCTION, self.buffer)
        self.futures.append(future)

        self.buffer = b""
        self.bytes_received = 0

    def collect_result(self):
        for future in self.futures:
            try:
                result = future.result()
                self.results.extend(result)
            except Exception as e:
                print("Error while processing chunk:", e)


@app.post("/test/upload")
async def upload_test(request: Request):
    filename = request.headers.get("file")
    content_type = request.headers.get("Content-Type")

    if not content_type or "boundary=" not in content_type:
        return JSONResponse(content="Invalid Content-Type header", status_code=400)

    _, boundary = content_type.split("boundary=")

    file = UploadFileByStream()

    callbacks = {
        'on_part_begin': file.on_part_begin,
        'on_part_data': file.on_part_data,
        'on_part_end': file.on_part_end,
    }

    parser = MultipartParser(boundary, callbacks)

    async for chunk in request.stream():
        parser.write(chunk)

    file.collect_result()

    return {"filename": filename}
YoYoDev
  • 9
  • 1
  • 3
  • I don't think you'll find anything to support that built-in to Starlette/FastAPI - what you're seeing is the raw multipart encoding as the stream is served. Have you looked at something like https://github.com/siddhantgoel/streaming-form-data as an option? (referenced in https://github.com/encode/starlette/issues/849) – MatsLindh Aug 14 '23 at 11:53
  • Hi, @MatsLindh Yes, I looked at streaming-form-data and found things like [ValueTarget](https://streaming-form-data.readthedocs.io/en/latest/#valuetarget) but when I tried it didn't work, the value is still empty And also i don't know if i can only keep the chunk in memory with ValueTarget. Perhaps you know of a framework where it's easier to do this? – YoYoDev Aug 14 '23 at 13:43

0 Answers0