3

I have seen lots of questions around about chunked streams in netty, but most of them were solutions about outbound streams, not inbound streams.

I would like to understand how can I get the data from the channel and send it as an InputStream to my business logic without loading all the data in memory first. Here's what I was trying to do:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private HttpServletRequest request;
  private PipedOutputStream os;
  private PipedInputStream is;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    this.os = new PipedOutputStream();
    this.is = new PipedInputStream(os);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
    this.os.close();
    this.is.close();
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf body = ((HttpContent) msg).content();

      if (body.readableBytes() > 0)
        body.readBytes(os, body.readableBytes());

      if (msg instanceof LastHttpContent) {
        os.close();
      }
    }

  }

}

And then I have another Handler that will get my CustomHttpRequest and send to what I call a ServiceHandler, where my business logic will read from the InputStream.

public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
    @Override
    public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
        future = serviceHandler.handle(request, response);
...

This does not work because when my Handler forwards the CustomHttpRequest to the ServiceHandler, and it tries to read from the InputStream, the thread is blocking, and the HttpContent is never handled in my Decoder.

I know I can try to create a separate thread for my Business Logic, but I have the impression I am overcomplicating things here.
I looked at ByteBufInputStream, but it says that

Please note that it only reads up to the number of readable bytes determined at the moment of construction.

So I don't think it will work for Chunked Http requests. Also, I saw ChunkedWriteHandler, which seems fine for Oubound chunks, but I couldn't find something as ChunkedReadHandler...

So my question is: what's the best way to do this? My requirementes are:

- Do not keep data in memory before sending the ServiceHandlers;
- The ServiceHandlers API should be netty agnostic (that's why I use my CustomHttpRequest, instead of Netty's HttpRequest);

UPDATE I have got this to work using a more reactive approach on the CustomHttpRequest. Now, the request does not provide an InputStream to the ServiceHandlers so they can read (which was blocking), but instead, the CustomHttpRequest now has a readInto(OutputStream) method that returns a Future, and all the service handler will just be executed when this Outputstream is fullfilled. Here is how it looks like

public class CustomHttpRequest {
  ...constructors and other methods hidden...
  private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();

  private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();

  private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);

  public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
    outputStreamFuture.set(os);
    return this.writeCompleteFuture;
  }

  ListenableFuture<Void> writeChunk(byte[] buf) {
    this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
      outputStreamFuture.get().write(buf);
      return Futures.immediateFuture(null);
    });
    return lastWriteFuture;
  }


  void complete() {
    ListenableFuture<Void> future =
        Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
          outputStreamFuture.get().close();
          return Futures.immediateFuture(null);
        });
    addFinallyCallback(future, () -> {
      this.writeCompleteFuture.set(null);
    });

  }
}

And my updated ServletRequestHandler looks like this:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private NettyHttpServletRequestAdaptor request;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
  }


  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;

      this.request = new CustomHttpRequest(request, ctx.channel());

      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf buf = ((HttpContent) msg).content();
      byte[] bytes = new byte[buf.readableBytes()];
      buf.readBytes(bytes);

      this.request.writeChunk(bytes);

      if (msg instanceof LastHttpContent) {
        this.request.complete();
      }
    }
  }
}

This works pretty well, but still, note that everything here is done in a single thread, and maybe for large data I might want to spawn a new thread to release that thread for other channels.

  • Does a regular InputStream, or OutputStream not work? PipedInput/OutputStream says "Attempting to use both objects from a single thread is not recommended as it may deadlock the thread" right in the docs. – aglassman Feb 07 '18 at 17:04
  • Well, you are right about the deadlock... But for now, my solution would imply that I need another thread for my ServiceHandler, in which case they would be used on different threads. But I need an InputStream on my ServiceHandler, whereas I believe I need to write to this InputStream on my netty handler somehow, and the way I found to do this was using Piped... Other ways would be using a ByteArrayInputStream, but I'd have memory issues, or ByteBuffInputStream, for which I have concerns I mentioned in the question. Any other ideas? – Murilo Tavares Feb 07 '18 at 19:19
  • I would say try BufferedInput/OutputStreams. – aglassman Feb 07 '18 at 19:52
  • I'm also worried about ServerRequestHandler. Unless this class is created per request, this will fail as soon as you get more than one request. – aglassman Feb 07 '18 at 19:56
  • @aglassman I can't see how BuffereInputStream would help here. – Murilo Tavares Feb 07 '18 at 20:37
  • @aglassman Regarding the ServerRequestHandler concern, maybe you are not familiar with netty or I'm missing something here. In Netty, each handler is only called by a single thread per design, so I believe this is safe. – Murilo Tavares Feb 07 '18 at 20:40
  • Nevermind, just got your point regarding the ServerRequestHandler, and indeed that's an issue I'll need to handle by resetting the state when I receive the LastHttpContent. Thanks for pointing this out. But still, I think my main concerns are still valid. – Murilo Tavares Feb 07 '18 at 21:10
  • Well, good luck. I've exhausted my limited knowledge of Netty! – aglassman Feb 07 '18 at 23:12

1 Answers1

0

You're on the right track - if your serviceHandler.handle(request, response); call is doing a blocking read, you need to create a new thread for it. Remember, there are supposed to be only a small number of Netty worker threads, so you shouldn't do any blocking calls in worker threads.

The other question to ask is, does your service handler need to be blocking? What does it do? If it's shoveling the data over the network anyway, can you incorporate it into the Netty pipeline in a non-blocking way? That way, everything is async all the way, no blocking calls and extra threads required.

spinlok
  • 3,561
  • 18
  • 27
  • Thanks for your response. I just updated my question with a solution where the service handler will not block while reading. But I'm curious about what you mean for "create a Netty pipeline for it". – Murilo Tavares Feb 14 '18 at 16:19
  • Sorry, it wasn't clear. Just edited to "can you incorporate into the netty pipeline". Basically, my point is that you only need to do blocking reads w/InputStream only if you're interfacing with libs that work that way. If you're in full control of your application, just use channels/handlers the Netty way. – spinlok Feb 14 '18 at 21:17
  • This is part of migrating an existing server to netty. This server allows developers to create their own service handlers. With that in mind, we prefer to keep the service handler API netty agnostic, by creating a netty handler that works as a Dispatcher for the service handlers. So adding each service hander to the netty pipeline is not much of an option for me. – Murilo Tavares Feb 15 '18 at 21:36