I have a working example using the Flux
module, it is similar to OkHttp
so hopefully it can help.
The example consumes an Open AI chat stream and sends it back to the client. Make sure to set the openAIAPIKey
variable to your key.
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.core.io.buffer.DataBuffer;
import reactor.core.publisher.Flux;
import org.springframework.http.*;
// ... class code
public Flux<Object> chatStream(OpenAIChatBody requestBody) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(openAIAPIKey);
WebClient client = WebClient.create("https://api.openai.com/v1");
// Send the request to openAI
return client.post()
.uri("/chat/completions")
.headers(httpHeaders -> httpHeaders.addAll(headers))
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(DataBuffer.class)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.concatMap(chunk -> {
String[] lines = chunk.split("\n");
return Flux.fromArray(lines)
.filter(line -> !line.trim().isEmpty())
.map(line -> line.replace("data:", "")
.replace("[DONE]", "")
.replace("data: [DONE]", "")
.trim());
})
.filter(data -> !data.isEmpty())
.concatMap(data -> {
try {
Map<String, Object> resultObject = new ObjectMapper().readValue(data, Map.class);
// return data - you can parse the resultObject to whatever it is that you need
return Flux.just(resultObject);
} catch (JsonProcessingException e) {
return null;
}
});
} catch (Exception e) {
return null;
}
}
You will need to include the following dependencies in the pom.xml
file:
<dependencies>
<!-- ... other dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- This is for flux to stop displaying a potential DNS warning for MACs -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns-native-macos</artifactId>
<version>4.1.73.Final</version>
<classifier>osx-aarch_64</classifier>
</dependency>
</dependencies>
All of this code can be found in a working file in the following example project.