I am currently working on a way to implement the openai-node
package into my Next.js application. Because of the long generation times of OpenAI completions, I want to make use of streaming (which is normally not supported inside the package, see here). But I have a workaround that works flawlessly, in the following code:
const response = await openai.createChatCompletion({
messages: [
{
role: 'user',
content: 'hello there!'
}
],
model: 'gpt-3.5-turbo',
temperature: 0.85,
stream: true
}, { responseType: 'stream' })
let activeRole = '';
response.data.on('data', (data: Buffer) => {
const lines = data.toString().split('\n').filter(line => line.trim() !== '');
lines.forEach(( line, idx, arr ) => {
const message = line.replace(/^data: /, '');
if(message === '[DONE]') return
const parsed:OpenAIStreamChunk = JSON.parse(message);
if(parsed.choices[0].finish_reason == 'stop') return
activeRole = parsed.choices[0].delta.role ?? activeRole
const chunk = {
role: activeRole,
content: parsed.choices[0].delta.content ?? 'EMPTY'
}
console.log(chunk)
})
});
Now, I want to use the above code to transfer this stream from my API route to my front-end (that the stream gets parsed there). I make use of tRPC
inside my application, and have created this thusfar:
// Backend code:
import { Readable } from 'stream';
import { initOpenAI } from 'lib/ai';
export const analysisRouter = router({
generate: authenticatedProcedure
.input(z.object({
}))
.mutation( async ({ ctx, input }) => {
const stream = new Readable({ objectMode: true})
const openai = initOpenAI()
const result = await openai.createChatCompletion({
messages: [
{
role: 'user',
content: 'hello there!'
}
],
model: 'gpt-3.5-turbo',
temperature: 0.85,
stream: true
}, { responseType: 'stream' })
let activeRole = '';
result.data.on('data', (data: Buffer) => {
const lines = data.toString().split('\n').filter(line => line.trim() !== '');
for(const line of lines) {
const message = line.replace(/^data: /, '');
if(message === '[DONE]') return
const parsed:OpenAIStreamChunk = JSON.parse(message);
if(parsed.choices[0].finish_reason == 'stop') return
activeRole = parsed.choices[0].delta.role ?? activeRole
if(parsed.choices[0].delta.content) {
stream.push({
role: activeRole,
content: parsed.choices[0].delta.content
})
}
}
});
return stream
})
})
// Front-end code, that reads the stream (this is in React)
useEffect(() => {
(async () => {
const stream = await trpc.analysis.generate.mutate({ type: 'lesson'})
stream.on('data', (data) => { console.log(data) })
})()
}, [])
Except the following code above doesn't work properly. The events on the client-side stream are empty, and stream.on
is not recognized as a valid function. I know that this tRPC code is technically not a stream, but can someone point me out the obvious? What do I need to change to support streaming through tRPC? Is this even an option?
Thanks in advance.