3

This question is related to this one, in which I asked about how to stream data from a Reactive Spring Controller.

As Rossen pointed out we have to use text/event-stream to send back the streamed results as server sent events, so far so good.

I have a service like this:

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream")
public Flux<Alert> getAccountAlertsStreaming() {
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
                                      new Alert((long)2, "Alert message2"),
                                      new Alert((long)3, "Alert message3")})
               .delayMillis(1000)
               .log();
}

Calling it from the browsers, the 3 results start to be received with 1 second delay.

I wanted to call this service from a WebClient and implemented it this way:

@Component
public class AccountsServiceClient {

    @Autowired
    private WebClient webClient;

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
        Flux<Alert> response = webClient
                .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
                .extract(bodyStream(Alert.class));
        return response;
    }       
}

And this is the test code:

@Test
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class})
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private AccountsServiceClient client;

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{

        CountDownLatch latch = new CountDownLatch(1);

        Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080");
        alerts.doOnComplete( () -> {
            latch.countDown();
        }).subscribe( (n) -> {
            logger.info("------------> GOT ALERT {}", n);
        });

        latch.await();
    }
}

The problem is that when the client tries to extract the results as it gets them, none of the HttpMessageReader's can read text/event-stream + Alert.class.

public class ResponseExtractors {

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
                ResolvableType responseType, MediaType contentType) {

            return messageReaders.stream()
                    .filter(e -> e.canRead(responseType, contentType))
                    .findFirst()
                    .orElseThrow(() ->
                            new WebClientException(
                                    "Could not decode response body of type '" + contentType
                                            + "' with target type '" + respons

eType.toString() + "'"));
    }

Exception:

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
    at reactor.core.Exceptions.bubble(Exceptions.java:97)
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263)
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183)
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169)
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161)
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123)
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75)
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010)
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70)
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71)
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203)
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200)
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181)
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89)
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source)
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126)
    ... 37 common frames omitted
Community
  • 1
  • 1
codependent
  • 23,193
  • 31
  • 166
  • 308
  • What's the difference between a WebClient and a web browser from the server'spoint of view? – Tassos Bassoukos Aug 06 '16 at 06:45
  • I see your point but, apart from the new semantics, if we aren't gonna get the results streamed, but all of them at the same time, what's the point of using the new `WebClient` instead of the old `RestTemplate`? – codependent Aug 06 '16 at 12:27

2 Answers2

0

Maybe this should be handled automatically by the framework. In any case, I solved it unmarshalling myself the JSON stream data:

WebConfigClient:

@Configuration
public class WebClientConfig {

    @Bean
    public ObjectMapper jacksonObjectMapper(){
        return new ObjectMapper();
    }

    @Bean
    public WebClient webClient(){
        WebClient webClient = new WebClient(new ReactorClientHttpConnector());
        return webClient;
    }

}

Service client:

@Component
public class AccountsServiceClient {

    @Autowired
    private WebClient webClient;

    @Autowired
    private ObjectMapper jacksonObjectMapper;

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
        Flux<Alert> response = webClient
                .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
                .extract(bodyStream(String.class))
                .map((e -> {
                    try {
                        e = e.substring(e.indexOf(":")+1);
                        Alert a = jacksonObjectMapper.readValue(e, Alert.class);
                        return a;
                    } catch (Exception e1) {
                        e1.printStackTrace();
                        return null;
                    }

                }));
        return response;
    }

}

UPDATE: As of Spring 5 M4 this is done by the framework. You can check the solution with the new API here: Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

Community
  • 1
  • 1
codependent
  • 23,193
  • 31
  • 166
  • 308
0

There's already an issue for that. Please comment/vote for SPR-14539.

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176