2

I have a simple code as below. This checks for alive status for a list of servers. Could you please let me know how this can be done in parallel using threading or any other suitable solutions.

        List<Host> hosts = this.getAllHosts();
        List<Host> aliveHosts = new ArrayList<>();
        if (hosts != null && hosts.size() > 0) {
            for (Host host : hosts) {
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return aliveHosts;

How can I call each getByName in a thread and execute this in parallel at the same time. Currently each of them is having a timeout of 3 seconds. If there are 10 items then the total time would be 30 seconds. Can anyone give a solution so that this can be done in 3-8 seconds overall.

Lajos Arpad
  • 64,414
  • 37
  • 100
  • 175
Shamnad P S
  • 1,095
  • 2
  • 15
  • 43

6 Answers6

6

With Java 8 streams:

List<Host> aliveHosts = hosts.stream()
                             .parallel()
                             .filter(h -> {
                                            try {
                                              return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT)
                                            } catch(Exception e) {
                                              return false;
                                            }
                                          })
                             .collect(Collectors.toList());
Jean-Baptiste Yunès
  • 34,548
  • 4
  • 48
  • 69
0

Let's consider this threading example:

public class SimpleThreads {

    // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName =
            Thread.currentThread().getName();
        System.out.format("%s: %s%n",
                          threadName,
                          message);
    }

    private static class MessageLoop
        implements Runnable {
        public void run() {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };
            try {
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(importantInfo[i]);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String args[])
        throws InterruptedException {

        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Starting MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop until MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish.
            t.join(1000);
            if (((System.currentTimeMillis() - startTime) > patience)
                  && t.isAlive()) {
                threadMessage("Tired of waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();
            }
        }
        threadMessage("Finally!");
    }
}

Source.

In essence, you need a Runnable which is responsible for the way your threads will work. You will need to instantiate a Thread, passing an instance of the Runnable you have and then start your Thread. You will need to have all the Threads accessible and Join them. You can easily manage the timeout limits as well.

Lajos Arpad
  • 64,414
  • 37
  • 100
  • 175
  • what do you think about the Java8 stream approach?. – Shamnad P S Jul 20 '17 at 07:57
  • 1
    @ShamnadPS it is a modern solution, which should work if you are using Java 8, but with earlier versions of Java it might be incompatible. I believe it is a particular solution for a desirable technical situation, but in the case when you have Java 7 and cannot update it for a valid reason, whatever that might be, the solution involving Java 8 will not work. I have deliberately given you a general solution from a tutorial, because you can get a deeper understanding by reading the whole tutorial and it will work regardless of your Java version. – Lajos Arpad Jul 20 '17 at 08:30
0

Non Java 8 way will look similar:

List<Host> hosts = this.getAllHosts();

    Queue<Host> q = new ArrayBlockingQueue<>(hosts.size(), true, hosts);
    ExecutorService ex = Executors.newFixedThreadPool(5);
    List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<>());

    while(!q.isEmpty()){
        ex.submit(new Runnable() {
            @Override
            public void run() {
                Host host = q.poll();
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        });
    }
    ex.shutdown();
}
Aleydin Karaimin
  • 944
  • 1
  • 6
  • 16
0

Java 8 and ExecutorService:

List<Host> hosts = this.getAllHosts();
List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<Host>());
ExecutorService executorService = Executors.newFixedThreadPool(10);
if (hosts != null && hosts.size() > 0) {
    for (Host host : hosts) {
        executorService.submit(() -> {
            try {
                if (InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                    aliveHosts.add(host);
                }
            } catch (IOException e) {
                // logger?
            }
        });
    }
}
executorService.shutdown();
return aliveHosts;
Adam Siemion
  • 15,569
  • 7
  • 58
  • 92
  • What is the advantage in using the ExecutorService when compared to Java 8 parallel stream. Can you please let me know. – Shamnad P S Jul 20 '17 at 08:05
  • 2
    There is no advantages *per se*, except controlling the level of concurrency. Java8 streams are there just to let you write essential logic and forget about details of runtime trickeries. – Jean-Baptiste Yunès Jul 20 '17 at 08:12
0

In addition to the accepted Java8 answer you can actually control the level of concurrency quite easily by using a custom ForkJoinPool:

final Predicate<Host> isAlive = h -> {
    try {
        return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT);
    } catch (Exception e) {
        return false;
    }
};
final Callable<List<Host>> collectAliveHosts = () ->
    hosts.stream().parallel().filter(isAlive).collect(Collectors.toList());

final ForkJoinPool threadPool = new ForkJoinPool(4);
final List<Host> aliveHosts = threadPool.submit(collectAliveHosts).get();

If you don't use a custom pool, the common ForkJoinPool will be used, which is sized according to the number of cores/CPUs your current machine has. This pool is however used by the whole JVM. That is, if you submit long running tasks to the common pool, the whole application might suffer some performance degradation.

dpr
  • 10,591
  • 3
  • 41
  • 71
  • Thank you for the answer. Can you let me know why you have given size 4 for the ForkJoinPool. can it be the size of the host list. – Shamnad P S Jul 21 '17 at 02:21
  • The size 4 of the pool in the example was just randomly taken. You can set the size to any number you want. But a larger pool doesn't always mean faster execution time. The optimal size of the thread pool strongly depends on the actual usecase (e.g. number of hosts), the environment (number of CPUs) and even the code that is executed in the threads (does it block or suspend threads, etc.). Basically you will need to play around with the pool size to determine the value that works best for your. However this value will not necessarily work best if executed in another environment. – dpr Jul 21 '17 at 08:12
0

We can do parallelly using Future interface.

package test.basics;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class TestFutureTask {
    private static final int TIMEOUT = 30000;

    public static void main(String[] args) {
        List<String> hosts = new ArrayList<String>();
        hosts.add("127.0.0.1");
        hosts.add("127.0.0.2");
        hosts.add("127.0.0.3");
        hosts.add("127.0.0.4");
        hosts.add("127.0.0.5");
        hosts.add("127.0.0.6");
        List<String> aliveHosts = new ArrayList<>();
        List<String> notAliveHosts = new ArrayList<>();

        long stTime = System.currentTimeMillis();
        System.out.println("Starting time " + stTime);
        Map<String, Future> jobList = new HashMap<>();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (String host : hosts) {

            Future f = newCachedThreadPool.submit(new Callable<Boolean>() {

                private String host;

                @Override
                public Boolean call() throws Exception {
                    return InetAddress.getByName(host).isReachable(TIMEOUT);
                }

                public Callable<Boolean> init(String host) {
                    this.host = host;
                    return this;
                }
            }.init(host));

            jobList.put(host, f);

        }

        for (String host : jobList.keySet()) {
            try {
                if ((boolean) jobList.get(host).get()) {
                    aliveHosts.add(host);
                } else {
                    notAliveHosts.add(host);
                }
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Ending time : " + endTime);
        System.out.println("Time taken :" + (endTime - stTime));
        System.out.println("Alive hosts: " + aliveHosts);
        System.out.println("Not alive hosts: " + notAliveHosts);
    }
}

Sample output:

Starting time 1500570979858

Ending time : 1500571009872

Time taken :30014

Alive hosts: [127.0.0.1]

Not alive hosts: [127.0.0.6, 127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2]