1

I want to stream a response from the OpenAI directly to my FastAPI's endpoint.

Code:

in my threads_handler.py which is in separate folder, I have following function askQuestion()

def askQuestion(self, collection_id, question):
    collection_name = "collection-" + str(collection_id)
    self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, verbose=VERBOSE, callback_manager=CallbackManager([MyCustomHandler()]))
    self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')
    
    chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)

    self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
                                                        return_source_documents=True,verbose=VERBOSE, 
                                                        memory=self.memory)
    
    result = self.chain({"question": question})
    return result['answer']

as you see, I specify callback_manager=CallbackManager([MyCustomHandler()])) --> i have defined it here:

q = queue.Queue()
stop_item = "###finish###"

class MyCustomHandler(StreamingStdOutCallbackHandler):
    def on_llm_start(
        self, serialized: dict[str, any], prompts: list[str], **kwargs: any
    ) -> None:
        """Run when LLM starts running."""
        with q.mutex:
            q.queue.clear()

    def on_llm_new_token(self, token: str, **kwargs: any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        sys.stdout.write(token)
        sys.stdout.flush()
        q.put(token)

the actual route definition in FastAPI is here:

@router.post("/collection/{collection_id}/ask_question")
async def ask_question(collection_id: str, request: Request):
    try:
        form_data = await request.form()
        question = form_data["question"]

        def generate_tokens():  
            result = thread_handler.askQuestion(collection_id, question)
            for token in result:
                yield token
        return StreamingResponse(generate_tokens(), media_type="application/json")


    except requests.exceptions.ConnectionError as e:
        raise HTTPException(status_code=500, detail="Error connecting to the server")
    except Exception as e:
        raise HTTPException(status_code=404, detail=str(e))

the streaming just doesn't want to work, however the response is being streamed in terminal. But I want it to be streamed directly to api

I test streaming with following api call in terminal:

curl -q -N -X POST -F 'question=What are the aspects in data science?' http://0.0.0.0/api/v1/collection/52/ask_question
devZ
  • 606
  • 1
  • 7
  • 23

1 Answers1

0

I can see that you have formed and returned a StreamingResponse from FastAPI, however, I feel you haven't considered that you might need to do some changes for the cURL request too. Try changing your request as above, and check for the output in your console.

import requests

url = 'your endpoint here'
headers = {
    'accept': 'application/json',
    'Content-Type': 'application/json'
}
data = {
    'input_text': 'your question here'
}

response = requests.post(url, headers=headers, json=data, stream=True)

for chunk in response.iter_content(chunk_size=1024):
    if chunk:
        print(chunk.decode('utf-8'), end = "", flush = True)

Vivek
  • 124
  • 14