7

I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents. Here are some parts of my code:

# Loading the LLM
def load_llm():
    return AzureChatOpenAI(
        temperature=hparams["temperature"],
        top_p=hparams["top_p"],
        max_tokens=hparams["max_tokens"],
        presence_penalty=hparams["presence_penalty"],
        frequency_penalty=hparams["freq_penaulty"],
        streaming=True, 
        callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), 
        verbose=True,
        model_name=hparams["model"],
        deployment_name = models_dict[hparams["model"]],
        )

# Loading the agent
def load_chain(memory, sys_msg, llm):
    """Logic for loading the chain you want to use should go here."""
    agent_chain = initialize_agent(tools, 
                                   llm, 
                                   agent="conversational-react-description", 
                                   verbose=True, 
                                   memory=memory, 
                                   agent_kwargs = {"added_prompt": sys_msg},
                                   streaming=True, 
                                   )
    return agent_chain

# Creating the chatbot to be used in Gradio.
class ChatWrapper:

    def __init__(self, sys_msg):
        self.lock = Lock()
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
        self.chain = load_chain(self.memory, sys_msg, load_llm())
        self.sysmsg = sys_msg
    def __call__(
        self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
    ):
        """Execute the chat functionality."""
        self.lock.acquire()
        try:
            history = history or []
            # Run chain and append input.
            output = self.chain.run(input=inp)
            
            history.append((inp, output))
        except Exception as e:
            raise e
        finally:
            self.lock.release()
        return history, history

I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.

Can you please help me with that?

MRF
  • 455
  • 1
  • 6
  • 13
  • from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage from langchain.callbacks.base import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler chat = ChatOpenAI(streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), verbose=True, temperature=0) resp = chat([HumanMessage(content="Write me a song about sparkling water.")]) – Fuad Fouad Jul 01 '23 at 11:07
  • https://python.langchain.com/docs/modules/model_io/models/chat/how_to/streaming StreamingStdOutCallbackHandler – Fuad Fouad Jul 01 '23 at 11:08

3 Answers3

5

One of possible solutions is to use a queue as a mediator.

  1. Create a queue
from queue import SimpleQueue
q = SimpleQueue()
  1. Create a custom callback, that will write produced tokens into the queue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union


job_done = object() # signals the processing is done

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Empty:
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""
        self.q.put(job_done)
  1. Give the callback to your LLM
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
                                  StreamingStdOutCallbackHandler()]), 
  1. In Gradio code, create a parallel thread, that will run your agent. Read from the queue.

I don't understand your ChatWrapper. Actually, I am not familiar with Gradio, so I will rely on an example from the documentation.

from threading import Thread

def bot(history):
    user_question = history[-1][0]
    thread = Thread(target=chain.run, kwargs={"input": user_question})
    thread.start()
    history[-1][1] = ""
    while True:
        next_token = q.get(block=True) # Blocks until an input is available
        if next_token is job_done:
            break
        history[-1][1] += next_token
        yield history
    thread.join()
Nokados
  • 191
  • 1
  • 5
0

To stream an agent’s response in Langchain, you can use a StreamingStdOutCallbackHandler callback.

Here is an example of how to use it:

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.base import CallbackManager

chat = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0
)

resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
Codemaker2015
  • 12,190
  • 6
  • 97
  • 81
-1

If you can write on stdout, why don't you also read from it?

import subprocess

def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/

mknull
  • 49
  • 4
  • Yea that's a way around but I thought there might be a way to properly implement it using on_llm_new_token – MRF Apr 27 '23 at 17:32
  • If you had to write your own, you would anyways want a buffer where the information is held until Gradio listens in. Since the info is already there, IMO you might as well just use stdout and call it a day. – mknull Apr 27 '23 at 17:56