0

I am implementing an elastic pool for my spring boot project also I am using spring boot 2.1.4 and elastic search 7.3.0. I am stuck at this. When any API trying to query it gives java.net.ConnectException: Connection refused. I want to use configuration with customizeHttpClient with a setting thread count. So It makes only one connection when application will start and querying to the database using that one connection only till bean will destroy.
I tried with this Elastic Configuration:

    import java.io.IOException;
    
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.impl.nio.reactor.IOReactorConfig;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ElasticConfig{
    
        public static String host;
    
        private static String port;
    
        private static String protocol;
    
        private static String username;
    
        private static String password;
    
        private RestHighLevelClient client;
    
        @Value("${dselastic.host}")
        public void setHost(String value) {
            host = value;
        }
    
        @Value("${dselastic.port}")
        public void setPort(String value) {
            port = value;
        }
    
        @Value("${dselastic.protocol}")
        public void setProtocol(String value) {
            protocol = value;
        }
    
        @Value("${dselastic.username}")
        public void setUsername(String value) {
            username = value;
        }
    
        @Value("${dselastic.password}")
        public void setPassword(String value) {
            password = value;
        }
    
        @Bean(destroyMethod = "cleanUp")
        @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
        public void prepareConnection() {
            RestClientBuilder restBuilder = RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol));
            if (username != null & password != null) {
                final CredentialsProvider creadential = new BasicCredentialsProvider();
                creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    
                        return httpClientBuilder.setDefaultCredentialsProvider(creadential)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
                    }
                });
                restBuilder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
                        .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
                        .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
                client = new RestHighLevelClient(restBuilder);
            }
        }
    
        public void cleanUp() {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

I also tried by implementing DisposableBean interface and its destroy method but I got same exception.
This is my API where I am trying to querying a document:

    public class IndexNameController {
    
        @Autowired
        RestHighLevelClient client;
    
        @GetMapping(value = "/listAllNames")
        public ArrayList<Object> listAllNames(HttpSession session) {
            ArrayList<Object> results = new ArrayList<>();
            try {
                SearchRequest searchRequest = new SearchRequest();
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchRequest.indices("indexname");
                String[] fields = { "name", "id" };
                searchSourceBuilder.fetchSource(fields, new String[] {});
                searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(10000);
                searchSourceBuilder = searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.DESC));
                searchRequest.source(searchSourceBuilder);
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                for (SearchHit searchHit : searchHits) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("value", searchHit.getSourceAsMap().get("id"));
                    map.put("name", searchHit.getSourceAsMap().get("name"));
                    results.add(map);
                }
                return results;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return new ArrayList<>();
        }
    }   

When it's trying to query it gives exception at client.search(). This is the Stack Trace:

    java.net.ConnectException: Connection refused
        at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:788)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
        at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:930)
        at com.incident.response.controller.IncidentController.listAllIncidents(IncidentController.java:569)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
        at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
        at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
        ... 1 more

Please help me to get rid from this. All help and suggestions will be appriciate.

danronmoon
  • 3,814
  • 5
  • 34
  • 56
Deep Dalsania
  • 375
  • 3
  • 8
  • 22
  • _connection refused_ means that there is no cluster/server at the configured address – P.J.Meisch May 27 '20 at 09:33
  • I added a response of the configured address please check @P.J.Meisch – Deep Dalsania May 27 '20 at 09:51
  • the question is: what are the values that effectively used in `RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol));` – P.J.Meisch May 27 '20 at 18:19
  • from what host did you issue the query that gave you the response? And where is your application running that tries to connect to Elasticsearch? It seems you have Elasticsearch running in docker, does your application run in docker as well? – P.J.Meisch May 28 '20 at 04:52
  • yes @P.J.Meisch – Deep Dalsania May 28 '20 at 04:53
  • so this is a networking problem between docker containers. Elasticsearch normally runs on port 9200, as you specify 9100, I support that you map that when starting your dockerized Elasticsearch instance. But within the docker network it would probably still be 9200. I 10.1.3.2 the IP address of the machine you are running docker on and from where you issue the direct connect to ES? Then this can't work for your dockerized Spring Boot application – P.J.Meisch May 28 '20 at 05:16
  • We have two elastic search in docker that's why I specified 9100 because 9200 have another version of elastic search. – Deep Dalsania May 28 '20 at 05:56
  • BUt which machine is 10.1.3.2? Is this the IP of the machine running docker, then this wil not be reachable from a different docker container – P.J.Meisch May 28 '20 at 06:07
  • Thank you @P.J.Meisch I researched a lot and finally, I get rid of this and I wrote an answer because if anyone wants to apply this concept or facing the same issue then it will be helpful for them. – Deep Dalsania May 28 '20 at 08:49
  • Thanks for the downvote @P.J.Meisch but I got my answer by research from [here](https://stackoverflow.com/questions/46166955/how-to-properly-close-raw-restclient-when-using-elastic-search-5-5-0-for-optimal) and [here](https://stackoverflow.com/questions/57109948/sockettimeoutexception-while-retrieving-or-inserting-data-into-elastic-search-by) also I understand both questions and answers and do my thing as I want and I got my answer. – Deep Dalsania May 29 '20 at 11:59

3 Answers3

2
  • I researched a lot that how to get rid of this and I tried many solutions after that finally I'll get the solution all things are working as I want.
  • I am trying to implement an elastic pool that one client used by the the whole project for any query or aggregation and when bean will destroy the will close and my whole query is done by using only by one connection.
  • I changed my Elastic configuration like this:
    @Configuration
    public class ElasticConfig {

        @Autowired
        Environment environment;

        private RestHighLevelClient client;

        @Bean
        public RestHighLevelClient prepareConnection() {
            RestClientBuilder restBuilder = RestClient
                    .builder(new HttpHost(environment.getProperty("zselastic.host").toString(),
                            Integer.valueOf(environment.getProperty("zselastic.port").toString()),
                            environment.getProperty("zselastic.protocol").toString()));
            String username = new String(environment.getProperty("zselastic.username").toString());
            String password = new String(environment.getProperty("zselastic.password").toString());
            if (username != null & password != null) {
                final CredentialsProvider creadential = new BasicCredentialsProvider();
                creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                        return httpClientBuilder.setDefaultCredentialsProvider(creadential)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
                    }
                });

                restBuilder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
                        .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
                        .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.

                client = new RestHighLevelClient(restBuilder);
                return client;
            }
            return null;
        }

        /*
         * it gets called when bean instance is getting removed from the context if
         * scope is not a prototype
         */
        /*
         * If there is a method named shutdown or close then spring container will try
         * to automatically configure them as callback methods when bean is being
         * destroyed
         */
        @PreDestroy
        public void clientClose() {
            try {
                this.client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • So now my bean return the RestHighLevelClient and it will be use by whole project and I got every API repsponse instead of java.net.ConnectException: Connection refused.
  • Also, I check node stats by using this http://host:port/_nodes/stats/http. When my Spring boot application starts it will initiate one connection to the elasticsearch and one entry will be added into current_open. After that, no connection will increase until my application is running all the queries, and aggregation is being performed by using this connection only. When my application will shutdown or stop connection will be close and entry removed from the current_open.
  • So now I can conclude that I applied elastic pool by using this configuration.
Deep Dalsania
  • 375
  • 3
  • 8
  • 22
0

In my case, I forgot to add the @Bean annotation at the method which created the Bean. Follow an implementation class of the AbstractElasticsearchConfiguration.

package org.lauksas.elasticsearch.configuration;

import java.time.Duration;

import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchEntityMapper;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.http.HttpHeaders;

@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {

  @Value("${elasticsearch.host}")
  private String host;
  @Value("${elasticsearch.port}")
  private int port;
  @Value("${elasticsearch.username}")
  private String username;
  @Value("${elasticsearch.password}")
  private String password;

  Logger log = LoggerFactory.getLogger(getClass());

  @Bean
  @Override
  public EntityMapper entityMapper() {
    ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(),
        new DefaultConversionService());
    entityMapper.setConversions(elasticsearchCustomConversions());

    return entityMapper;
  }

  @Override
  @Bean
  public RestHighLevelClient elasticsearchClient() {
    HttpHeaders headers = new HttpHeaders();
    headers.setBasicAuth(username, password);

    final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
        .usingSsl().withBasicAuth(username, password).withSocketTimeout(Duration.ofMinutes(10))
        .build();

    return RestClients.create(clientConfiguration).rest();
  }

  @Bean
  public RestClient restClient() {
    HttpHeaders headers = new HttpHeaders();
    headers.setBasicAuth(username, password);

    final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
        .usingSsl().withBasicAuth(username, password).build();

    return RestClients.create(clientConfiguration).lowLevelRest();
  }

  @Bean
  @Primary
  public ElasticsearchOperations elasticsearchTemplate() {
    return elasticsearchOperations();
  }
}
lauksas
  • 533
  • 7
  • 14
0

For me, earlier I was using RestClientBuilder and later I started using RestHighLevelClient which helped me get rid off the timeout issue.

rj4u
  • 87
  • 10