0

I have the following function, which should do the following:

  • Take a ReadableStream
  • Read it chunk by chunk
  • Before the last chunk (openai indicates this with a "DONE" string), an extra chunk with my extensionPayload should be added.

The issue is that the data of the last chunk from the original/openai stream + my extension data are merged into one chunk. But to process the chunks in the client, I need them to be separate chunks.

export async function extendOpenAIStream(
  openaiStream: ReadableStream<Uint8Array>,
  extensionPayload: JSONValue
) {
  const encoder = new TextEncoder()
  const decoder = new TextDecoder()

  const reader = openaiStream.getReader()

  const stream = new ReadableStream({
    cancel() {
      reader.cancel()
    },
    async start(controller) {
      while (true) {
        const { done, value } = await reader.read()
        const dataString = decoder.decode(value)

        // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
        if (done || dataString.includes('[DONE]')) {
          // Enque our extension
          const extendedValue = encoder.encode(
            `data: ${JSON.stringify(extensionPayload)}\n\n`
          )
          controller.enqueue(extendedValue)

          // Enque the original chunk
          controller.enqueue(value)

          // Close the stream
          controller.close()
          break
        }
        controller.enqueue(value)
      }
    },
  })

  return stream
}

Expected chunks (separated chunks):

data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]

Actual chunks (merged into one chunk):

data: {"extensionPayload": {...}}

data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]
flavordaaave
  • 644
  • 9
  • 21

1 Answers1

0

If anyone needs to do something similar, I finally have a solution. Turned out that it doesn't matter if the chunks are splitted or not since I'm using eventsource-parser. The real issue was that chunks where getting fragmented. According to vercel this is expected (see here. I'm now using eventsource-parser in the cloud function as well to ensure that chunks are not fragmented:

const stream = new ReadableStream({
  cancel() {
    reader.cancel()
  },
  async start(controller) {
    // Chunks might get fragmented so we use eventsource-parse to ensure the chunks are complete
    // See: https://vercel.com/docs/concepts/functions/edge-functions/streaming#caveats
    const parser = createParser((e) => {
      if (e.type !== 'event') return
      controller.enqueue(encoder.encode(`data: ${e.data}\n\n`))
    })

    while (true) {
      const { done, value } = await reader.read()
      const dataString = decoder.decode(value)

      // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
      if (done || dataString.includes('[DONE]')) {
        // Enque our extension
        const extendedValue = encoder.encode(
          `data: ${JSON.stringify(extensionPayload)}\n\n`
        )
        controller.enqueue(extendedValue)

        // Close the stream
        controller.close()
        break
      }
      parser.feed(dataString)
    }
  },
})
flavordaaave
  • 644
  • 9
  • 21