I am trying to demonstrate the advantages of using Reactive Streams in Spring MVC. For that I have a small Jetty server running with two end-points:
/normal
returns a POJO/flux
returns the same object wrapped in aMono
Then I start a client and launch a few thousand simultaneous requests at one of these end-points. I would have expected to see fewer errors with the second end-point, where the processing happens asynchronously. However, I sometimes observe more errors on the async-enabled endpoint; in both cases, anywhere between 60 - 90 % errors of Connection refused: no further information
.
Either I'm doing something wrong here or I don't quite understand. Connection refused
is just the kind of thing I would expect to avoid.
Server
Here is my code from the server. In the normal
case I literally block the thread with a .sleep()
:
@Controller
public class FluxController {
@GetMapping(value = "/normal", produces = MediaType.APPLICATION_JSON_VALUE)
public Map normal() throws Exception {
Thread.sleep(randomTime());
return Collections.singletonMap("type", "normal");
}
@GetMapping(value = "/flux", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Map> flux() {
return Mono.delay(Duration.ofMillis(randomTime()))
.map(x -> Collections.singletonMap("type", "flux"));
}
private static long randomTime() {
return ThreadLocalRandom.current().nextLong(200, 1000);
}
}
The server is running on Jetty 9.4.15 through Maven and the web.xml is defined with:
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1">
Client
My Client uses Spring WebClient:
public class ClientApplication {
private static final String ENDPOINT = "normal";
private static final int REPETITIONS = 10_000;
public static void main(String[] args) {
WebClient client = WebClient.create("http://localhost:8080");
AtomicInteger errors = new AtomicInteger(0);
List<Mono<Response>> responses = IntStream.range(0, REPETITIONS)
.mapToObj(i -> client.get()
.uri(ENDPOINT)
.retrieve()
.bodyToMono(Response.class)
.doOnError(e -> errors.incrementAndGet())
.onErrorResume(e -> Mono.empty())
)
.collect(Collectors.toList());
Mono.when(responses)
.block();
System.out.println(String.format("%-2f %% errors", errors.get() * 100.0 / REPETITIONS));
}
static class Response {
public String type;
}
}
A similar premise to the question here: WebFlux async processing. The main difference is that I am testing the error rate, or the number of synchronous connections; I don't expect a speed increase.