4

I am trying to return a response of a picture from S3. In StreamingResponse.stream_response I see, that chunks are read from the stream and sent to the socket. But on the other side, nothing comes.

That's what I see in the browser

import uvicorn

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

from aiobotocore.session import get_session
from aioboto3 import session

app = FastAPI()


@app.get("/")
async def main():

    sess = get_session()
    async with sess.create_client(
            's3',
            endpoint_url="",
            aws_secret_access_key="",
            aws_access_key_id=""
    ) as client:
        result = await client.get_object(Bucket="", Key="")

        return StreamingResponse(result["Body"], media_type="image/jpeg")


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)```
Chris
  • 18,724
  • 6
  • 46
  • 80
Soltanat
  • 51
  • 1
  • 3
  • Please have a look at [this answer](https://stackoverflow.com/a/71639658/17865804) on how to return an image in bytes and display the image in a web browser. – Chris May 05 '22 at 15:35

4 Answers4

6

just as an addition, if you use boto3 you don't need to write a wrapper around the StreamingResponse but can just pass the result of result["Body"].iter_chunks() as content:

import boto3
import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse

app = FastAPI()

s3 = boto3.client("s3")


@app.get("/")
async def main():
    try:
        result = s3.get_object(Bucket="my_awesome_bucket", Key="path/to/file")
        return StreamingResponse(content=result["Body"].iter_chunks())
    except Exception as e:
        if hasattr(e, "message"):
            raise HTTPException(
                status_code=e.message["response"]["Error"]["Code"],
                detail=e.message["response"]["Error"]["Message"],
            )
        else:
            raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)

Did not try it with aioboto3 yet

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
1

After return from main, boto3 client context closing. And what I see in send it is only buffer data. It's sample work for me.

import typing
import uvicorn

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

from aiobotocore.session import get_session
from starlette.background import BackgroundTask

app = FastAPI()


class S3Stream(StreamingResponse):
    def __init__(
            self,
            content: typing.Any,
            status_code: int = 200,
            headers: dict = None,
            media_type: str = None,
            background: BackgroundTask = None,
            Key: str = None, Bucket: str = None
    ) -> None:
        super(S3Stream, self).__init__(content, status_code, headers, media_type, background)
        self.Key = Key
        self.Bucket = Bucket

    async def stream_response(self, send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )

        sess = get_session()
        async with sess.create_client(
                's3',
                endpoint_url="",
                aws_secret_access_key="",
                aws_access_key_id=""
        ) as client:

            result = await client.get_object(Bucket=self.Bucket, Key=self.Key)

            async for chunk in result["Body"]:
                if not isinstance(chunk, bytes):
                    chunk = chunk.encode(self.charset)

                await send({"type": "http.response.body", "body": chunk, "more_body": True})

        await send({"type": "http.response.body", "body": b"", "more_body": False})


@app.get("/")
async def main():
    return S3Stream(content=None, Bucket="test", Key="priroda_kartinki_foto_03.jpeg")


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)
Sean McGrath
  • 80
  • 1
  • 10
Soltanat
  • 51
  • 1
  • 3
1

On my side I manage to have it work by simply doing:

async def get_by_uuid(uuid: str):
    async def s3_stream():
        async with session.client("s3") as s3:
            result_object = await s3.get_object(
                Bucket=s3_bucket_name,
                Key=f'{uuid}.json',
                )
            async for chunk in result_object['Body']:
                yield chunk
    return StreamingResponse(s3_stream())
repié
  • 182
  • 2
  • 18
0

You can try this example:

async def s3_client(
    settings: Settings = fastapi.Depends(get_settings),
) -> AsyncGenerator[aiobotocore.client.AioBaseClient, None]:
    session = aiobotocore.session.AioSession()
    async with session.create_client(
        "s3",
        endpoint_url=settings.aws_endpoint_url,
        aws_access_key_id=settings.aws_access_key_id,
        aws_secret_access_key=settings.aws_secret_access_key,
        verify=settings.aws_tls_verify,
    ) as client:
        yield client

def s3_client_sync(
    settings: Settings = fastapi.Depends(get_settings),
) -> botocore.client.BaseClient:
    session = botocore.session.Session()
    return session.create_client(
        "s3",
        endpoint_url=settings.aws_endpoint_url,
        aws_access_key_id=settings.aws_access_key_id,
        aws_secret_access_key=settings.aws_secret_access_key,
        verify=settings.aws_tls_verify,
    )

@router.get(
    "/tasks/{task_id}/image",
    summary="Download Image",
    response_class=StreamingResponse,
)
async def get_image(
    task_id: int = Path(..., title="Task ID"),
    settings: Settings = fastapi.Depends(get_settings),
    s3_client: aiobotocore.client.AioBaseClient = fastapi.Depends(s3_client),
    # s3_client_sync: botocore.client.BaseClient = fastapi.Depends(s3_client_sync),
):

    s3_bucket = settings.s3_image_bucket
    s3_key = f"test/{task_id}/image.jpg"
    try:
        image = await s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
        # image = s3_client_sync.get_object(Bucket=s3_bucket, Key=s3_key)
    except Exception:
        raise HTTPException(status_code=500, detail="Failed to get the image")

    return StreamingResponse(
        content=image["Body"],  # 非同期イテレータ
        media_type="image/jpeg",
        headers={"content-length": str(image["ContentLength"]), "etag": image["ETag"]},
    )

https://engineers.safie.link/entry/2022/11/14/fastapi-streaming-response

BacNV
  • 1
  • 2
  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Dec 12 '22 at 01:07