4

I am currently working on a way to implement the openai-node package into my Next.js application. Because of the long generation times of OpenAI completions, I want to make use of streaming (which is normally not supported inside the package, see here). But I have a workaround that works flawlessly, in the following code:

const response = await openai.createChatCompletion({
  messages: [
    {
      role: 'user',
      content: 'hello there!'
    }
  ],
  model: 'gpt-3.5-turbo',
  temperature: 0.85,
  stream: true
}, { responseType: 'stream' })

let activeRole = '';

response.data.on('data', (data: Buffer) => {

  const lines = data.toString().split('\n').filter(line => line.trim() !== '');
  lines.forEach(( line, idx, arr ) => {

    const message = line.replace(/^data: /, '');
    if(message === '[DONE]') return

    const parsed:OpenAIStreamChunk = JSON.parse(message);
    if(parsed.choices[0].finish_reason == 'stop') return

    activeRole = parsed.choices[0].delta.role ?? activeRole

    const chunk = {
      role: activeRole,
      content: parsed.choices[0].delta.content ?? 'EMPTY'
    }

    console.log(chunk)
  })

});

Now, I want to use the above code to transfer this stream from my API route to my front-end (that the stream gets parsed there). I make use of tRPC inside my application, and have created this thusfar:

// Backend code:

import { Readable } from 'stream';
import { initOpenAI } from 'lib/ai';

export const analysisRouter = router({

  generate: authenticatedProcedure
    .input(z.object({
    }))
    .mutation( async ({ ctx, input }) => {

      const stream = new Readable({ objectMode: true})
      const openai = initOpenAI()

      const result = await openai.createChatCompletion({
        messages: [
          {
            role: 'user',
            content: 'hello there!'
          }
        ],
        model: 'gpt-3.5-turbo',
        temperature: 0.85,
        stream: true
      }, { responseType: 'stream' })

      let activeRole = '';

      result.data.on('data', (data: Buffer) => {

        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for(const line of lines) {

          const message = line.replace(/^data: /, '');
          if(message === '[DONE]') return

          const parsed:OpenAIStreamChunk = JSON.parse(message);
          if(parsed.choices[0].finish_reason == 'stop') return

          activeRole = parsed.choices[0].delta.role ?? activeRole

          if(parsed.choices[0].delta.content) {
            stream.push({
              role: activeRole,
              content: parsed.choices[0].delta.content
            })
          }
        }
      });

      return stream
    })
})


// Front-end code, that reads the stream (this is in React)

useEffect(() => {
  (async () => {

    const stream = await trpc.analysis.generate.mutate({ type: 'lesson'})
    stream.on('data', (data) => { console.log(data) })
    
  })()
}, [])

Except the following code above doesn't work properly. The events on the client-side stream are empty, and stream.on is not recognized as a valid function. I know that this tRPC code is technically not a stream, but can someone point me out the obvious? What do I need to change to support streaming through tRPC? Is this even an option?

Thanks in advance.

thim24
  • 624
  • 1
  • 5
  • 14

1 Answers1

4

As of 13 April,tRPC supports json only.

I asked this question to a core team member yesterday in a chat.

There’s a discussion on that topic but it’s unclear if streaming will be supported in tRPC.

p6l-richard
  • 321
  • 2
  • 11