I'm trying to create an endpoint that allows me to download a large file using a stream. But I want to get chunks of the file with a size specified as (CHUNK_SIZE = 1024 * 1024 * 20). Furthermore, I only want the binary data of the file in my chunks.
Example:
async for chunk in request.stream():
print(chunk)
current result:
b'--8af8cd2412f5572588d978d2a7c51016\r\nContent-Disposition: form-data; name="file"; filename="toto.txt"\r\nContent-Type: text/plain\r\n\r\n'
b'this is my file data! (Can be huge data)\r\n--8af8cd2412f5572588d978d2a7c51016--\r\n'
Result that i want (only the binary data of my file):
b'this is my file data! (Can be huge data)'
# or for large file
b'...' * 999
If the community can help me, that would be very kind.
I tried several things like:
- streaming_form_data: but i don't need to create the file on my hard drive, i saw i could use stuff with TargetValue but it didn't work (for me).
- I even tried to make my own form data parser, but it doesn't work at all.
from fastapi import Request
@app.post("/uploads/stream")
async def upload_stream(request: Request):
filename = request.headers.get('file')
boundary = request.headers.get('content-type').split("boundary=")[1].encode("utf-8")
content_type = request.headers.get('content-type').encode("utf-8")
ender = b"\r\n--" + boundary + b"--\r\n"
starter = b"--" + boundary + b"Content-Disposition: form-data; name=\"file\"; filename=\"" + filename.encode("utf-8") + b"\"\r\nContent-Type: " + content_type + b"\r\n\r\n"
is_file = False
async for chunk in request.stream():
if len(chunk) == 0:
continue
if starter in chunk:
print("starter found")
print(chunk.split(starter)[1])
is_file = True
if ender in chunk:
print("ender found")
print(chunk.split(ender)[0])
is_file = False
if is_file:
pass # todo: do stuff with the chunk here
return {"file": filename}
RESOLVE:
For this solution, I use python multipart:
pip install python-multipart
I've optimized the callback with thread pool and collector and a CHUNK_SIZE, but feel free to update my code, or let me know if I've made any mistakes.
from concurrent.futures import ThreadPoolExecutor
from multipart import MultipartParser
from fastapi import Request
from fastapi.responses import JSONResponse
CHUNK_SIZE = 1024 * 1024 * 20
class UploadFileByStream:
def __init__(self):
self.buffer = None
self.bytes_received = None
self.executor = ThreadPoolExecutor()
self.results = []
self.futures = []
def on_part_begin(self):
self.buffer = b""
self.bytes_received = 0
def on_part_data(self, data, start, end):
self.buffer += data[start:end]
self.bytes_received += end - start
if self.bytes_received >= CHUNK_SIZE:
self.send_buffer()
def on_part_end(self):
if self.buffer:
self.send_buffer()
def send_buffer(self):
future = self.executor.submit(YOUR_FUNCTION, self.buffer)
self.futures.append(future)
self.buffer = b""
self.bytes_received = 0
def collect_result(self):
for future in self.futures:
try:
result = future.result()
self.results.extend(result)
except Exception as e:
print("Error while processing chunk:", e)
@app.post("/test/upload")
async def upload_test(request: Request):
filename = request.headers.get("file")
content_type = request.headers.get("Content-Type")
if not content_type or "boundary=" not in content_type:
return JSONResponse(content="Invalid Content-Type header", status_code=400)
_, boundary = content_type.split("boundary=")
file = UploadFileByStream()
callbacks = {
'on_part_begin': file.on_part_begin,
'on_part_data': file.on_part_data,
'on_part_end': file.on_part_end,
}
parser = MultipartParser(boundary, callbacks)
async for chunk in request.stream():
parser.write(chunk)
file.collect_result()
return {"filename": filename}