8

I have a requirement to read and write compressed (GZIP) streams without intermediate storage. Currently, I'm using Spring RestTemplate to do the writing, and Apache HTTP client to do the reading (see my answer here for an explanation of why RestTemplate can't be used for reading large streams). The implementation is fairly straightforward, where I slap a GZIPInputStream on the response InputStream and move on.

Now, I'd like to switch to using Spring 5 WebClient (just because I'm not a fan of status quo). However, WebClient is reactive in nature and deals with Flux<Stuff>; I believe it's possible to get a Flux<DataBuffer>, where DataBuffer is an abstraction over ByteBuffer. Question is, how do I decompress it on the fly without having to store the full stream in memory (OutOfMemoryError, I'm looking at you), or writing to local disk? It's worth mentioning that WebClient uses Netty under the hood.

I'll admit to not knowing much about (de)compression, however, I did my research, but none of the material available online seemed particularly helpful.

compression on java nio direct buffers

Writing GZIP file with nio

Reading a GZIP file from a FileChannel (Java NIO)

(de)compressing files using NIO

Iterable gzip deflate/inflate in Java

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • does just using the `asInputStream()` method on the data buffer and put that into a `GZIPInputStream` not work? – vandale Jan 01 '18 at 00:43
  • 1
    @vandale I don't follow, each `DataBuffer` is only a part, right? `GZIPInputStream` is supposed to operate on a complete `InputStream`, not a bunch of it. There's a way to reconstruct the `InputStream` using `DataBuffer.asInputStream ` and `SequenceInputStream`, but that'd be defeating the purpose of using reactive NIO. – Abhijit Sarkar Jan 01 '18 at 01:26

2 Answers2

4
public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
    private final HttpHeaders httpHeaders;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse &&
                !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();

            httpHeaders.forEach(e -> {
                log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                headers.set(e.getKey(), e.getValue());
            });
        }
        ctx.fireChannelRead(msg);
    }
}

Then I create a ClientHttpConnector to use with WebClient and in afterNettyContextInit add the handler:

ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
    io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
            true,
            CONTENT_ENCODING, GZIP,
            CONTENT_TYPE, APPLICATION_JSON
    );
    HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
    ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());

This, of course, would fail for responses that are not GZIP compressed, so I use this instance of WebClient for a particular use case only, where I know for sure that the response is compressed.

Writing is easy: Spring has a ResourceEncoder, so InputStream can simply be converted to InputStreamResource, and voila!

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
2

Noting this here as it confused me a bit - the API has changed a bit as of 5.1.

I have a similar setup to the accepted answer for the ChannelInboundHandler:

public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse
                && !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();
            headers.clear();
            headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
            headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
        }
        ctx.fireChannelRead(msg);
    }
}

(The header values I needed are just hard-coded there for simplicity, otherwise it's identical.)

To register it however is different:

WebClient.builder()
    .clientConnector(
            new ReactorClientHttpConnector(
                    HttpClient.from(
                            TcpClient.create()
                                    .doOnConnected(c -> {
                                        c.addHandlerFirst(new HttpContentDecompressor());
                                        c.addHandlerFirst(new HttpResponseHeadersHandler());
                                    })
                    ).compress(true)
            )
    )
    .build();

It seems Netty now maintains a user list of handlers separate from (and after) the system list, and addHandlerFirst() only puts your handler at the front of the user list. It therefore requires an explicit call to HttpContentDecompressor to ensure it's definitely executed after your handler that inserts the correct headers.

Michael Berry
  • 70,193
  • 21
  • 157
  • 216
  • I‘m not seeing the difference. In my answer, `HttpContentDecompressor` was being added using `addHandlerLast` call. – Abhijit Sarkar Dec 03 '19 at 01:13
  • @AbhijitSarkar Oh, right you are. The only difference is where it's added in that case - like membersound I struggled to see where to put it, then found you use the above snippet rather than `afterNettyContextInit()` in recent versions. If you want to add that snippet to your answer I'll delete this one (not interested in the rep, just wanted the complete code sample to be out there.) – Michael Berry Dec 03 '19 at 08:13