1

The OpenAI chat/completions API supports a stream response by passing stream=true. This enables the response to come in chunks of data rather than one full response. It will be similar to how ChatGPT outputs answers.

I see similar question for different languages or tech stack (like here). But I can't find a full example of how to implement that in java (or kotlin) with OkHttp client.

How do get OpenAI's API response in form of stream? Thanks!

Helen
  • 87,344
  • 17
  • 243
  • 314
Ziad Halabi
  • 964
  • 11
  • 31

3 Answers3

1

I modified the code, now ChatGPT can fully realize streaming conversation in the console via Java.

import com.theokanning.openai.completion.chat.*;
import com.theokanning.openai.service.OpenAiService;
import io.reactivex.Flowable;
import java.util.*;

public class Starter {
    // Create a new OpenAiService instance with the given API key
    public static OpenAiService service = new OpenAiService("Your API Token");

    public static void main(String[] args) {
        System.out.println("Streaming chat completion...");
        Scanner scanner = new Scanner(System.in);
        String userInput = scanner.nextLine();
        // Create a list of ChatMessage objects
        List<ChatMessage> message = new ArrayList<ChatMessage>();
        message.add(new ChatMessage(ChatMessageRole.USER.value(), userInput));
        // Create a ChatCompletionRequest object
        ChatCompletionRequest chatCompletionRequest;
        boolean running = true;
        // Run the loop until the user enters "exit"
        while (running) {
            chatCompletionRequest = ChatCompletionRequest
                    .builder()
                    .model("gpt-3.5-turbo")
                    .messages(message)
                    .n(1)
                    .maxTokens(500)
                    .logitBias(Collections.emptyMap())
                    .build();
            // Create a Flowable object to stream the chat completion
            Flowable<ChatCompletionChunk> flowableResult = service.streamChatCompletion(chatCompletionRequest);
            // Create a StringBuilder object to store the result
            StringBuilder buffer = new StringBuilder();
            // Subscribe to the Flowable object and print the result
            flowableResult.subscribe(chunk -> {
                chunk.getChoices().forEach(choice -> {
                    String result = choice.getMessage().getContent();
                    if (result != null) {
                        buffer.append(result);
                        System.out.print(choice.getMessage().getContent());
                    }
                });
            }, Throwable::printStackTrace, () -> System.out.println());
            // Get the user input
            userInput = scanner.nextLine();
            // Add the user input to the list of ChatMessage objects
            message.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), buffer.toString()));
            message.add(new ChatMessage(ChatMessageRole.USER.value(), userInput));
            // Exit the loop if the user enters "exit"
            if (userInput.equals("exit")) {
                running = false;
            }
        }
        scanner.close();
        service.shutdownExecutor();
    }
}
QuinncyPP
  • 11
  • 2
  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Apr 19 '23 at 07:38
1

this is the okhttp with java to recieve the stream response from ChatGPT:

 public static void okHttpEvent(SseEmitter emitter, String prompt, String openAiKey) {
        Request request = new Request.Builder()
                .url("https://{open-api-address}/v1/chat/stream/completions?q=" + prompt + "&eid=" + RequestContext.getRequestId())
                .build();
        OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(new Interceptor() {
                    @NonNull
                    @Override
                    public Response intercept(@NonNull Interceptor.Chain chain) throws IOException {
                        Request original = chain.request();
                        Request request = original.newBuilder()
                                .header("Authorization", "Bearer " + openAiKey)
                                .header("Accept", MediaType.TEXT_EVENT_STREAM.toString())
                                .method(original.method(), original.body())
                                .build();
                        return chain.proceed(request);
                    }
                })
                .connectTimeout(3, TimeUnit.MINUTES)
                .readTimeout(3, TimeUnit.MINUTES)
                .build();

        RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {

            @Override
            public void onOpen(EventSource eventSource, Response response) {

            }

            @Override
            public void onEvent(EventSource eventSource, String id, String type, String data) {
                try {
                    HttpUrl url = eventSource.request().url();
                    String reqId = url.queryParameter("eid");
                    SseChatService.getInstance().appendMsg(data, reqId);
                    emitter.send(data);
                } catch (IOException e) {
                    emitter.completeWithError(e);
                    log.error("send sse to client error", e);
                }
            }

            @Override
            public void onClosed(EventSource eventSource) {
                emitter.complete();
            }

            @Override
            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                emitter.completeWithError(t);
                log.error("event source failure", t);
            }
        });

        realEventSource.connect(okHttpClient);
    }
Dolphin
  • 29,069
  • 61
  • 260
  • 539
0

I have a working example using the Flux module, it is similar to OkHttp so hopefully it can help.

The example consumes an Open AI chat stream and sends it back to the client. Make sure to set the openAIAPIKey variable to your key.

import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.core.io.buffer.DataBuffer;
import reactor.core.publisher.Flux;
import org.springframework.http.*;

// ... class code

public Flux<Object> chatStream(OpenAIChatBody requestBody) {
  try {
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);
    headers.setBearerAuth(openAIAPIKey);
    WebClient client = WebClient.create("https://api.openai.com/v1");
    // Send the request to openAI
    return client.post()
      .uri("/chat/completions")
      .headers(httpHeaders -> httpHeaders.addAll(headers))
      .bodyValue(requestBody)
      .retrieve()
      .bodyToFlux(DataBuffer.class)
      .map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
      .concatMap(chunk -> {
        String[] lines = chunk.split("\n");
        return Flux.fromArray(lines)
          .filter(line -> !line.trim().isEmpty())
          .map(line -> line.replace("data:", "")
            .replace("[DONE]", "")
            .replace("data: [DONE]", "")
            .trim());
      })
      .filter(data -> !data.isEmpty())
      .concatMap(data -> {
        try {
          Map<String, Object> resultObject = new ObjectMapper().readValue(data, Map.class);
          // return data - you can parse the resultObject to whatever it is that you need
          return Flux.just(resultObject);
        } catch (JsonProcessingException e) {
          return null;
        }
    });
  } catch (Exception e) {
    return null;
  }
}

You will need to include the following dependencies in the pom.xml file:

<dependencies>
    <!-- ... other dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <!-- This is for flux to stop displaying a potential DNS warning for MACs -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-resolver-dns-native-macos</artifactId>
        <version>4.1.73.Final</version>
        <classifier>osx-aarch_64</classifier>
    </dependency>
</dependencies>

All of this code can be found in a working file in the following example project.

Ovidijus Parsiunas
  • 2,512
  • 2
  • 8
  • 18