3

The main goal is to send xml files from one folder through REST service into Cassandra DB. What I'm trying to do is just read all the files inside certain folder, and create a Worker object with file path set into it.

while (RUNS > 0) {
            ExecutorService executor = Executors.newFixedThreadPool(N_THREADS);

            File dir = new File(PATH_TO_SAMPLES);
            File[] listFiles = dir.listFiles();

            if (listFiles != null) {

                for (File file : listFiles) {

                    Worker worker = new Worker();
                    worker.setPath(file.toPath());

                    executor.submit(worker);
                }
            }

            executor.shutdown();

            // Wait until all threads are finish
            while (!executor.isTerminated()) {
            }

            Thread.sleep(1000);

            RUNS--;
        }

After that executor gets a worker instance and goes to the next file in the directory. RUNS initialized with value of total number of iteration, default is 100_000. N_THREADS - total number of threads, set to 100 by default.

Worker class implements Runnable. Run method:

@Override
    public void run() {

        String url = getUrl();
        String payload = "xml_file_representation";

        MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();

        HttpClient client = new HttpClient();

        HttpConnectionManagerParams httpConnectionManagerParams = new HttpConnectionManagerParams();
        connectionManager.setParams(httpConnectionManagerParams);

        client.setHttpConnectionManager(connectionManager);

        PostMethod postMethod = new PostMethod(url);

        try {

            postMethod.setRequestHeader("User-Agent", USER_AGENT);
            postMethod.setRequestHeader("Content-Type", "application/xml");

            postMethod.setRequestEntity(new StringRequestEntity(payload, "application/xml", StandardCharsets.UTF_8.toString()));

            int statusCode = client.executeMethod(postMethod);

            InputStream body = postMethod.getResponseBodyAsStream();

            if (statusCode == HttpStatus.SC_OK) {

                //OK
            }

        } catch (Exception e) {
            LOG.error("POST: ERROR!");
        } finally {
            postMethod.releaseConnection();
            connectionManager.shutdown();
        }
    }

If I remove waiting, i.e.

Thread.sleep(1000);

at the end of run, when ~16_000 requests were sent I'll get an exception:

java.net.BindException: Address already in use

It's very similar to BindException: address already in use on a client socket?

Anyway, accepted answer didn't help me. I don't have more ideas what I need to do to close those "connections" in order to prevent that error.

Such workaround as Thread.sleep() doesn't look like good solution also. Thanks for any help or advice.

Community
  • 1
  • 1
dofamin
  • 65
  • 9
  • Curious. Just a guess, but from your description it sounds possible that the creation of threads is running ahead of the closing of them; and you're running out of available ports. Then again, it looks like you are properly closing the HTTP clients before terminating the thread which should avoid this. Just possible your finally block is invoking native code that doesn;t get executed before the thread returns. – Elliotte Rusty Harold Feb 03 '16 at 16:08
  • 1
    @Berger Perfect suggestion, thanks. Method A (when global HttpClient shares between all the threads) resolved my issue. – dofamin Feb 04 '16 at 08:20
  • @ElliotteRustyHarold not sure what was the real cause, but global HttpClient and MultiThreadConnManager have resolved my problem though. – dofamin Feb 04 '16 at 08:23

1 Answers1

0

Creating a conn mgr for every worker defeats the purpose of the conn mgr. It is supposed to be shared among threads, judging by the linked other post from Berger. It's like having none.

You have the OS keeping sockets lingering after close. Not much you can do other than reusing the sockets (connections) with the design proposed in that other question.

Unless you want to risk playing with Socket.setSOLinger() TCP option SO_LINGER (zero) - when it's required

The Thread.sleep(1000) is certainly not a viable option, because you could require more than 1 second another day... or the OS could change that amount of lingering sockets under other circumstances.

Community
  • 1
  • 1
user2023577
  • 1,752
  • 1
  • 12
  • 23