7

I've been looking for hints on how to best test Spring MVC Controller methods that return SseEmitters. I have come up pretty short, but have a trial-and-error solution that tests against asynchronous, threaded behavior. The below is sample code just to demonstrate concept, there may be a typo or two:

Controller Class:

@Autowired
Publisher<MyResponse> responsePublisher;

@RequestMapping("/mypath")
public SseEmitter index() throws IOException {
    SseEmitter emitter = new SseEmitter();
    Observable<MyResponse> responseObservable = RxReactiveStreams.toObservable(responsePublisher);

    responseObservable.subscribe(
            response -> {
                try {
                    emitter.send(response);
               } catch (IOException ex) {
                    emitter.completeWithError(ex);
               }
            },
            error -> {
                emitter.completeWithError(error);
            },
            emitter::complete
    );

    return emitter;
}

Test Class:

//A threaded dummy publisher to demonstrate async properties.
//Sends 2 responses with a 250ms pause in between.
protected static class MockPublisher implements Publisher<MyResponse> {
    @Override
    public void subscribe(Subscriber<? super MyResponse> subscriber) {
        new Thread() {
            @Override
            public void run() {
                try {
                    subscriber.onNext(response1);
                    Thread.sleep(250);
                    subscriber.onNext(response2);
                } catch (InterruptedException ex) {
                }
                subscriber.onComplete();
            }
        }.start();
    }
}

//Assume @Configuration that autowires the above mock publisher in the controller.

//Tests the output of the controller method.
@Test
public void testSseEmitter() throws Exception {
    String path = "http://localhost/mypath/";
    String expectedContent = "data:" + response1.toString() + "\n\n" +
                             "data:" + response2.toString() + "\n\n");

    //Trial-and-Error attempts at testing this SseEmitter mechanism have yielded the following:
    //- Returning an SseEmitter triggers 'asyncStarted'
    //- Calling 'asyncResult' forces the test to wait for the process to complete
    //- However, there is no actual 'asyncResult' to test.  Instead, the content is checked for the published data.
    mockMvc.perform(get(path).contentType(MediaType.ALL))
        .andExpect(status().isOk())
        .andExpect(request().asyncStarted())
        .andExpect(request().asyncResult(nullValue()))
        .andExpect(header().string("Content-Type", "text/event-stream"))
        .andExpect(content().string(expectedContent))
}

As noted in the comments, asyncResult() is called to ensure that the publisher finishes its work and sends both responses before the test completes. Without it, the content check fails due to only one response being present in the content. However there is no actual result to check, hence asyncResult is null.

My specific question is whether there is a better, more precise way to force the test to wait for the async process to finish, rather than the klugie method here of waiting for a non-existent asyncResult. My broader question is whether there are other libs or Spring methods that are better suited to this vs. these async functions. Thanks!

Kevin Page
  • 191
  • 2
  • 7
  • I don't know if there is a better way - but your kludge helped me test a controller that returned a `SseEmitter` using plain Spring MVC instead of Spring WebFlux - so thanks! – Erin Drummond Jun 04 '19 at 23:10
  • I think this is more reliable way to control server-side behavior: https://stackoverflow.com/a/51564293/3640794 – xmcax Sep 11 '20 at 09:12

1 Answers1

0

This is a more general answer as it is meant to test an SseEmitter that will run forever, but will disconnect from SSE stream after a given timeout.

As for a different approach than MVC, as @ErinDrummond commented to the OP, you might want to investigate WebFlux.

It is a minimal example. One might want to expand with headers to the request, different matchers or maybe work on the stream output separately.

It is setting a delayed thread for disconnecting from SSE Stream which will allow to perform assertions.

@Autowired
MockMvc mockMvc;


@Test
public void testSseEmitter(){

    ScheduledExecutorService execService = Executors.newScheduledThreadPool(1);
    String streamUri = "/your-get-uri");
    long timeout = 500L;
    TimeUnit timeUnit = TimeUnit.MILLISECONDS;

    MvcResult result = mockMvc.perform(get(streamURI)
            .andExpect(request().asyncStarted()).andReturn();

    MockAsyncContext asyncContext = (MockAsyncContext) result.getRequest().getAsyncContext();
    execService.schedule(() -> {
        for (AsyncListener listener : asyncContext.getListeners())
            try {
                listener.onTimeout(null);
            } catch (IOException e) {
                e.printStackTrace();
            }
    }, timeout, timeUnit);

    result.getAsyncResult();

    // assertions, e.g. response body as string contains "xyz"
    mvc.perform(asyncDispatch(result)).andExpect(content().string(containsString("xyz")));
}
webMac
  • 185
  • 1
  • 2
  • 11
p m
  • 49
  • 5
  • I'm sorry. But this test is not working for me anyway. When I do a further assertion of .andExpect(status().isOk()) in the //assertion at the end I'll get an failed test because it returns an status 503 instead 200 (ok). Furthermore, how can I mock the contained contend it will expect (e.g. "xyz")? – webMac Jul 26 '21 at 13:30