3

I read here that EntityUtils.consume(httpEntity) will result in releasing the connection back to the connection pool, but when I looked at the source code, I couldn't understand how is that happening. Can someone please point me to the part of the code where EntityUtils.consume(httpEntity) or EntityUtils.toString(httpEntity) is releasing the connection when using the low level Elastic Search Rest Client ?

What happens to the connection if there is a SocketTimeoutException and I don't consume the HttpEntity?

aran
  • 10,978
  • 5
  • 39
  • 69
A.D
  • 381
  • 2
  • 8

1 Answers1

3

Client-side close and connection release to the Pool (steps)

  1. EntityUtils.consume & EntityUtils.toString > the first one will close() the instream if it fully consumes the entity. The second one will always call instream.close() in its finally clause. instream is the name given to the InputStream variable.

  2. instream.close() > For this example, the implementation of the InputStream is a ContentInputStream. The close() method forces the ContentInputStream to be read till its end by the loop mechanism shown in the code snippet.

    Following calls to this stream will lead to an EOF exception.

    @Override
    public void close() throws IOException 
    {
      final byte tmp[] = new byte[1024];
      /*loop until read() is -1, which means, advance the buffer till is end*/
      while (this.buffer.read(tmp, 0, tmp.length) >= 0) {}
      super.close();
    }
    
  3. Pool > Checks all pooled resources status. This operation may be triggered by some actions (as a new request), or may be managed by underlying threads. If one resource/stream was closed by the other end, it will get an EOF exception (as the buffer was forced to advance to the end). The spot is marked as invalid.

  4. Pool > All invalid spots are recycled. It will remove the closed streams and create new ones, or restore the existing ones without the need of erase+create (depending on the resource type). This means the spot that was holding the stream is avaliable again, with a new stream ready to be used:

    The connection is released back to the pool. The other end is not using it anymore, so the pool has total control of it. Now the pool is allowed to erase it, restore it, and assign it to another requester..


Example

Let's imagine a Pool that manages 3 resources, such as HttpConnections. You already have 3 threads using this pool, so the spots are all occupied.

Meanwhile ThreadZ waits for a connection to be released back to the Pool

 (spot1) [HttpConn1] -- ThreadA
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

ThreadA finished its job and closes its connection. The Pool will notice this when the status of the PoolEntry is closed. Different PoolEntry implementations will check this is different ways, one of them being getting an EOF exception while trying to read from a stream. Other implementations could have different mechanisms to check if the resource is closed. If the PoolEntry tells that his resource is closed/invalid, the Pool will recycle this spot. There are two options here:

a) Erase and create.

 (spot1) [HttpConn4] // the pool removes the old one and creates a new connection
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

b) Restore.

 (spot1) [HttpConn1] // the pool is able to "reset" the existing resource
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

"Releasing the connection back" could be translated to "now there's an avaliable spot/resource again". The pool can now give a connection to ThreadZ:

 (spot1) [HttpConn1] -- ThreadZ
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

consume/toString - connection release

All explained above means that calling close() in the InputStream will trigger the connection release.

This happens both in consume (if the entity content is fully consumed) and toString methods:

public static void consume(final HttpEntity entity) throws IOException 
{ 
    if (entity == null) 
        return; 
   
    if (entity.isStreaming()) 
    { 
        InputStream instream = entity.getContent(); 
        if (instream != null) 
            instream.close();   // <-- connection release
    } 
} 

public static String toString(final HttpEntity entity, final Charset defaultCharset) 
                              throws IOException, ParseException 
{ 
    Args.notNull(entity, "Entity"); 
    InputStream instream = entity.getContent(); 
    if (instream == null) { 
        return null; 
    } 
    try { 
        Args.check(entity.getContentLength() <= Integer.MAX_VALUE,  
                "HTTP entity too large to be buffered in memory"); 
        int i = (int)entity.getContentLength(); 
        if (i < 0) { 
            i = 4096; 
        } 
        Charset charset = null; 
        try { 
            ContentType contentType = ContentType.getOrDefault(entity); 
            charset = contentType.getCharset(); 
        } catch (UnsupportedCharsetException ex) { 
            throw new UnsupportedEncodingException(ex.getMessage()); 
        } 
        if (charset == null) { 
            charset = defaultCharset; 
        } 
        if (charset == null) { 
            charset = HTTP.DEF_CONTENT_CHARSET; 
        } 
        Reader reader = new InputStreamReader(instream, charset); 
        CharArrayBuffer buffer = new CharArrayBuffer(i); 
        char[] tmp = new char[1024]; 
        int l; 
        while((l = reader.read(tmp)) != -1) { 
            buffer.append(tmp, 0, l); 
        } 
        return buffer.toString(); 
    } finally { 
        instream.close();     // <--- connection release
    } 
} 

What happens to the connection if there is a SocketTimeoutException and I don't consume the HttpEntity?

As you notice, both methods throw an IOException, and SocketTimeoutException is inherited from it. It is responsability of the caller to catch this exception and manage to close all resources if such scenario happens. For example:

void tryConsume()
{
   try 
   {
     //...
      EntityUtils.consume(httpEntity);
     //...
   }
   catch (IOException)
   {
     //SocketTimeoutException happened. Log the error,etc
     // (Close resources here...)
   }
   finally
   {
     //...Or maybe include a finally clause and close them here, if you wish 
     // them to be closed regardless of success/failure.
     if (httpEntity!=null)
     {
        InputStream instream = httpEntity.getContent(); 
        if (instream != null) 
            instream.close();   /* <-- connection release. when checking this 
                                 spot, the pool will get (f.e) an EOF 
                                 exception. This will lead to replacing this 
                                 resource with a fresh new connection and 
                                 setting the spot status as avaliable. */
      }
   }
}

Notice that if a SocketTimeoutException is thrown, specific PoolEntry implementations could also check if the resource is invalid without the need of a close() call. Using close() guarantees that the Pool will recycle the spot after a correct use of it, and can be used as well as a "invalid marker" when you are able to catch a thrown exception.

But specific Pool implementations will be able to also check if a resource is invalid even if an uncatched Exception didn't let you specifically call close(), as they would be able to check the status with different mechanisms. For example, checking how many time a connection was in IDLE state. If this time is superior to a certain treshold marked by the Pool, this spot will be recycled without the need of a previous close() call from the client.

This time the Pool will be the end that calls close() on it, avoiding a possible deadlock in the client side, if this one doesn't manage the max connection time or certain exceptions.

aran
  • 10,978
  • 5
  • 39
  • 69
  • Thanks, I should have been more specific in my question though. I looked at instream.close() and if we take for example ContentInputStream or ByteArrayInputStream, which part of close() code is triggering the connection release? InputStream.close() seems to be doing nothing. – A.D Dec 15 '20 at 00:51
  • The underlying definition of releasing is based on the nature of pools. Closing means destroying that resource, hence now one spot of the pool is avaliable again. The pool will then generate a new connection (if needed), totally avaliable for the next clients. That's what releasing back means. – aran Dec 15 '20 at 01:01
  • I added some info regarding this in the edit – aran Dec 15 '20 at 01:05
  • For example, let's take ```PoolingNHttpClientConnectionManager``` which is what ElasticSearch RestClient uses. How does this connection manager realize that the connection is free to be used to serve another request because of instream.close()? Also you mentioned to close the resource if EntityUtils.consume(httpEntity) returns IOException, but I thought EntityUtils.consume(httpEntity) itself is used to close resource. How do we close the resource if EntityUtils.consume(httpEntity) itself returns an Exception? – A.D Dec 15 '20 at 01:18
  • Because the Pool will constantly check wether their pooled resources are closed or not. If one is closed, it will remove it from the pool and create a new avaliable one. If one end closes the stream, the other one will get an exception: this is what the pool will check in order to know if it must generate a new one – aran Dec 15 '20 at 01:19
  • Looking at the source code, as ContentInputStream.close() or InputStream.close() is doing nothing, how does calling instream.close() help PoolingNHttpClientConnectionManager to decide whether the resource is closed or not? – A.D Dec 15 '20 at 01:57
  • 1
    If you close an stream, what happens if you try to read from it after? An exception is thrown. The pool will check the status of every resource constantly. If it gets an exception, it knows that it was closed. The logic is implicit in the resource behaviour: https://stackoverflow.com/a/51830113/2148953 – aran Dec 15 '20 at 02:02
  • There's no specific method like "tellPoolThatIFreedThis", "releaseThis", or similar. The logic is based on exceptions. Just close it --> the other end (the pool) will therefore get an exception. That's the point. – aran Dec 15 '20 at 02:07
  • InputStream's close() does nothing because his implementations must define the logic to close the stream. ContentInputStreams's close() method DOES implement this:https://jar-download.com/artifacts/com.liferay/com.liferay.petra.json.web.service.client/14.0.12/source-code/org/apache/http/nio/entity/ContentInputStream.java – aran Dec 15 '20 at 02:15
  • 1
    Check the close method. It will loop whiile read is >= 0. The point is that buffers return -1 when the end is reached: it forces the stream's EOF exception if someone else tries to read from it. – aran Dec 15 '20 at 02:19
  • https://hc.apache.org/httpcomponents-core-ga/httpcore-nio/apidocs/org/apache/http/nio/util/ContentInputBuffer.html#read(byte[],%20int,%20int) – aran Dec 15 '20 at 02:19
  • Thank you for the explanation! But do you know where in the source code is ```PoolingNHttpClientConnectionManager``` trying to check this for each connection if the resource is freed or not? Is it calling stream.read() and if it receives an exception or EOF, it considers the connection free to be used by other requests? – A.D Dec 15 '20 at 02:22
  • Also in the second part of the answer, you mentioned to close the resource if EntityUtils.consume(httpEntity) returns IOException, but I thought EntityUtils.consume(httpEntity) itself is used to close resource by calling instream.close(). How do we close the resource if EntityUtils.consume(httpEntity) itself returns an IOException? – A.D Dec 15 '20 at 02:27
  • Check the processPendingRequests method here: http://www.javased.com/index.php?source_dir=httpcore/httpcore-nio/src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java – aran Dec 15 '20 at 02:50
  • When it calls entry.isClosed(), it makes a call to a specific type of entry which logic to know wether is closed is implemented. Finding through subclasses of Entry, you'll find the EOF check in order to return TRUE from this method. The processPendingRequests is periodically cleaning/removing/creating new resources for the pool --> in this implementation, the status check is triggered when a new connection is requested . Other implementations start foreground threads that check the status without the need of any trigger. – aran Dec 15 '20 at 02:52
  • Regarding your consume() question, the stream will only be closed if it finished consuming. The point is that you have access to instream from outside that method, so an exception is thrown, from the caller's code, you should catch it and explicitely call instream.close(). – aran Dec 15 '20 at 02:55
  • Notice the CPool variable inside PoolingNHttpClientConnectionManager. It extends from https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/pool/AbstractConnPool.html. The behaviour should be similar than the linked AbstractNIOConnPool. That's the key – aran Dec 15 '20 at 03:00
  • Updated the answer in order to explain the stream closing if an exception is thrown – aran Dec 15 '20 at 03:04
  • Thanks Aran! Regarding SocketTimeoutException, restClient.performRequest() is not returning response, but just throwing an Exception if there is an exception - https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L268. So I don't have access to response or it's instream in order to close it when there is SocketTimeoutException. – A.D Dec 15 '20 at 06:05
  • You mentioned "Finding through subclasses of Entry, you'll find the EOF check in order to return TRUE from this method". Do you remember which subclass did you see where this check is happening in isClosed()? – A.D Dec 15 '20 at 06:18
  • Even if a exception is thrown, note that this is a future, in the performRequest method: httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); This means that it will return something to close: an HttpResponse. Check the execute method here: https://github.com/xSke/CoreServer/blob/master/src/org/apache/http/impl/nio/client/CloseableHttpAsyncClient.java – aran Dec 15 '20 at 06:24
  • From the HttpResponse, you can call the getEntity() method: https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/HttpResponse.html#getEntity() – aran Dec 15 '20 at 06:27
  • And from this entity, which is a HttpEntity https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/HttpEntity.html --> you can finally call getContent() , which returns the inputStream you want to close – aran Dec 15 '20 at 06:28
  • Regarding the isClosed() reference: all PoolEntry implementations must define this method. Here you have the base abstract isClosed reference: https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/pool/PoolEntry.html#isClosed() – aran Dec 15 '20 at 06:32
  • But performRequest() is not returning HttpResponse if there is a SocketTimeoutException: https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L289. It seems to be throwing the exception directly. – A.D Dec 15 '20 at 06:37
  • Futures are meant to behave that way; Even if a exception is thrown, you will get the future later. The execute method will do it asynchronously, even if an exception is thrown first. Check this: https://stackoverflow.com/questions/34269015/getting-a-result-in-the-future – aran Dec 15 '20 at 06:44
  • Here an example of an implementation of a PoolEntry. Check the isClosed() method: https://jar-download.com/artifacts/org.apache.httpcomponents/httpcore-nio/4.4.11/source-code/org/apache/http/impl/nio/pool/BasicNIOPoolEntry.java – aran Dec 15 '20 at 06:44
  • More about getting futures: https://stackoverflow.com/questions/48109628/how-to-get-future-return-when-cause-timeout-exception-in-java – aran Dec 15 '20 at 06:50
  • @AD Check twice the behaviour of the https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L289 method. Notice the client.execute(..).get() call. That's a future to be returned regardless of any exception. This future is an HttpResponse, from that, you can get the stream (on previous comments) – aran Dec 15 '20 at 06:54
  • Not sure if I'm missing something, but Future is not returned by performRequest() right, in order for me to do future.get()? Are you saying https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L289 will return future or HttpResponse when there is a SocketTimeoutException? Where is that happening? – A.D Dec 15 '20 at 06:55
  • On the execute method within the client, which ends with a .get(). Line 279 in your link, please check its behaviour twice. Execute will return a future here --> https://github.com/xSke/CoreServer/blob/master/src/org/apache/http/impl/nio/client/CloseableHttpAsyncClient.java this is an asynchronous call and is not stopped even if an exception has been thrown – aran Dec 15 '20 at 06:56
  • But that is not returned to the caller of performRequest() in case there is a SocketTimeoutException or an IOException as seen in Line 289: https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L289 – A.D Dec 15 '20 at 07:03
  • No! Please re-read the performRequest method. It will only throw the exception if no more nodes are left, and will recursively call itself again before throwing it. This means the previous futures will be returned regardless of IOExceptions happening. This futures will wrap the underlying IOExceptions until no more nodes are left and the line 289 is executed --> JUST ONCE, if all nodes fail. – aran Dec 15 '20 at 07:09
  • Read the description above the method: Failing hosts are marked dead and retried after a * certain amount of time (minimum 1 minute, maximum 30 minutes), depending * on how many times they previously failed (the more failures, the later * they will be retried). In case of failures all of the alive nodes (or * dead nodes that deserve a retry) are retried until one responds or none * of them does, in which case an {@link IOException} will be thrown. – aran Dec 15 '20 at 07:14
  • But when every node is throwing SocketTimeoutException, and all nodes are exhausted i.e., when nodeTuple.nodes.hasNext() returns false, it will throw exception without returning response right? – A.D Dec 15 '20 at 07:15
  • Or let me put it this way, when performRequest() executes line 289 here: https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java#L289, nothing will be returned to the caller other than the Exception right? – A.D Dec 15 '20 at 07:17
  • By the time the IOException happens (just once, again), you'll have received multiple futures, HttpResponses, telling you that something is wrong. Line 289 will only be executed if ALL NODES FAIL. Look at what is happening just before line 289: if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } --> It will call itself recursively avoiding the IOException --> letting you receive the futures, until the last try is done, until all nodes failed. Then and only then line 289 will be executed. – aran Dec 15 '20 at 07:18
  • Please read the description of the performRequest() method within your link. It's all there. – aran Dec 15 '20 at 07:23
  • Line 286 is avoiding the execution of line 289, until all nodes fail. Before that, you'll get the futures. Only if all retries fail this method will throw the IOException. – aran Dec 15 '20 at 07:24
  • For simplicity, assume there is only one node, line 286 wouldn't even execute in that case if SocketTimeoutException is returned by that one node right? – A.D Dec 15 '20 at 07:26
  • If a SocketTimeoutException occurs within the execute method, it will be catched INSIDE the execute, and returned as future. If the SocketTimeout happens while waiting the get() operation, which is a totally different thing, it will be thrown if only one retry is set -> the difference is between getting a socketTimeout while EXECUTING and getting it while WAITING for the future. If the second happens, line 289 will be executed. But if the SocketTimeout happens within the EXECUTE method, you'll get an HttpResponse. The difference here must be clear. – aran Dec 15 '20 at 08:04
  • The only operation that leads to a IOException, line 289, is in the get() method. Execute won't ever throw this even if a SocketTimeout happened inside him. His responsability is to wrap the exception and return it as a response. – aran Dec 15 '20 at 08:06
  • As a result, is obvious you won't be able to get an inputstream, because no future was ever returned --> not because a SocketTimeout, but because get() couldn't retrieve any future that wrapped it. The retries are meant to, at least, try to retrieve one of those futures (HttpResponses) that will wrap this error. – aran Dec 15 '20 at 08:09
  • https://hc.apache.org/httpcomponents-asyncclient-4.1.x/httpasyncclient/apidocs/org/apache/http/nio/client/HttpAsyncClient.html Notice that no exception will ever be thrown from these methods. If one happens, they must wrap it in the response. – aran Dec 15 '20 at 08:27
  • https://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Future.html#get() This is the only method that will throw it. Not because of an error on the execution, but because it was interrupted while waiting the future. – aran Dec 15 '20 at 08:29
  • SocketTimeoutException is always returned by get() right, as we are waiting for response and it doesn't come till the duration of socket timeout? Would SocketTimeoutException also happen in execute()? – A.D Dec 15 '20 at 08:30
  • If it does, it will be returned in the HttpResponse. --> it is timing out on establishing the connection. If it throws the IOException, means no retries were set and the timeout happend while waiting the future --> it is timing out on the get() wait. That's the point of the retries: try getting one future in time (which would give you access to the exception, as well as to the underlying stream) – aran Dec 15 '20 at 08:34
  • I believe if there is timeout during establishing connection, it would be connection timeout and not Socket timeout. So whenever there is a SocketTimeOut when I'm using Elastic Search RestClient, it looks like I don't need to do anything to free up the resources. – A.D Dec 15 '20 at 08:38
  • 1
    god we should take a coffee somewhere to resume this. The resume may also become a book. – aran Dec 15 '20 at 08:39
  • That's it. That's also the point of Pools. They manage resources and know when to close them (if properly implemented), even if the client/process can't explicitely close them. Also, as they mark a maximum number of concurrent resources being used, you avoid the memory leak issue that would happen if multiple connection are let unclosed. – aran Dec 15 '20 at 08:40
  • Thank you for your time. Regarding https://jar-download.com/artifacts/org.apache.httpcomponents/httpcore-nio/4.4.11/source-code/org/apache/http/impl/nio/pool/BasicNIOPoolEntry.java where you mentioned to look at close(), I couldn't figure out where is this looking for EOF in the stream. – A.D Dec 15 '20 at 08:47
  • The EOF is just a possibility for the check, if using buffers and ContentInputStream. In this case, follow the NHttpConnection used in the BasicNIOPoolEntry --> it is a Subclass of HttpConnection, where the close method is located: https://github.com/wso2/wso2-commons-httpclient/blob/4e00cbb34b3775ca148b811bfd6de2945a52ed18/commons-httpclient/src/main/java/org/apache/commons/httpclient/HttpConnection.java#L1214 – aran Dec 15 '20 at 09:00
  • Close() will call that linked method, which will be responsible of closing the streams. The key is that it sets a boolean (isOpen) to false. In this case, the implementation of the PoolEntry must only check if isOpen==false in order to know if the resource (the HttpConnection) was closed. – aran Dec 15 '20 at 09:02
  • This abstraction avoids specifically catching an exception in order to know if a resource is closed. The only issue within this scenario would be: isOpen is set to false, but the streams were never closed (because of an exception). The Pool will believe the stream is not in use, while the streams are still open. This would also be managed by the pool, when it tries to assign the non-closed-yet stream to another requester. In this case, it's the pool that will need to explicitely call close on these, in order to avoid open connections that are not assigned to any client running through the JVM – aran Dec 15 '20 at 09:23
  • On other scenario, the stream is interrupted but not managed, so isOpen is never set to false. The Pool must implement some mechanisms to check this deadlocks (he will never be notified about this resource being invalid); For example, controlling the max amount of time of a connection, checking for how much it was in IDLE state, etc. – aran Dec 15 '20 at 09:24
  • Notice that not all types of PoolEntries will implement the isOpen() method, nor have any flag. That's where the "raw" method of catching exceptions (such as EOF) would happen. – aran Dec 15 '20 at 09:29
  • @aran For instream.close(); I am getting the following error - org.apache.http.ConnectionClosedException: Premature end of chunk coded message body: closing chunk expected – YourAboutMeIsBlank Sep 09 '21 at 03:54
  • @YourAboutMeIsBlank probably trying to use the response once closed. Check this, may help: https://stackoverflow.com/questions/33633549/org-apache-http-connectionclosedexception-premature-end-of-chunk-coded-message – aran Sep 10 '21 at 23:14