3

I have tried all the options provided in the existing threads like this, this and this.

I will have 100s of resource-intensive commands and each of them will run ranging from few minutes to 4-5 hours. I wish to have max 5 concurrent threads that will serve these commands.

Right now, I am trying out the code on just 10 mkdir commands on Windows. The output is proper and I don't get any exceptions, however, the 10 processess are not terminating !

one command for example :

"testthread1"~mkdir "E:\\Omkar\\Development\\Files\\java\\testthread1"

The Runnable. Note that process.waitFor() is commented in the finally block(I tried it in the try block just after start() as well but same issue), if I use it, only 5 directories are created :

public class CommandRunner implements Runnable {

    private static final Logger logger = LogManager
            .getLogger(CommandRunner.class);

    private String command;
    private boolean isRemoteExecution = false;
    private Properties configurations;
    private String commandReferenceName;

    public CommandRunner(String command, boolean isRemoteExecution,
            Properties configurations, String commandReferenceName) {
        super();
        this.command = command;
        this.isRemoteExecution = isRemoteExecution;
        this.configurations = configurations;
        this.commandReferenceName = commandReferenceName;

    }

    @Override
    public void run() {
        // TODO Auto-generated method stub

        if (isRemoteExecution) {

            executeCommandRemotely();
        } else {
            executeCommandLocally();
        }
    }

    private void executeCommandRemotely() {

    }

    private void executeCommandLocally() {

        ProcessBuilder processBuilder = getProcessBuilder(command);
        Process process = null;

        try {

            logger.debug("Starting " + commandReferenceName);

            process = processBuilder.start();
            // int errCode = process.waitFor();
            //
            // logger.debug("Executed ", this, " errors ? ", (errCode == 0 ?
            // "No"
            // : "Yes"));
            //
            printInputStreamTraditionalWay(process.getInputStream());

        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error("IOException", e);
        } finally {
            logger.debug("finally block for thread " + commandReferenceName);
            try {
                //process.waitFor();
                process.getInputStream().close();
                process.getOutputStream().flush();
                process.getOutputStream().close();
                process.getErrorStream().close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                logger.error("IOException in finally block", e);
            }

        }

    }

    private void printInputStreamTraditionalWay(InputStream inputStream) {

        byte[] byteArray = new byte[1024];

        try {
            while (inputStream.available() != 0) {
                inputStream.read(byteArray);
                logger.debug(new String(byteArray));
            }

            logger.debug("Input stream consumed, returning !");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error(
                    "IOException while reading Process input stream TraditionalWay!",
                    e);
        } finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                logger.error(
                        "IOException while closing Process input stream TraditionalWay!",
                        e);
            }
        }
    }

    private void printInputStreamBufferedWay(InputStream inputStream) {
        // TODO Auto-generated method stub
        BufferedReader br = null;

        br = new BufferedReader(new InputStreamReader(inputStream));
        String line = null;

        try {
            while ((line = br.readLine()) != null) {

                logger.debug(line + System.getProperty("line.separator"));

            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error(
                    "IOException while reading Process input stream BufferedWay!",
                    e);
        } finally {
            try {
                br.close();
                inputStream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                logger.error(
                        "IOException while closing Process input stream BufferedWay!",
                        e);
            }

        }

    }

    private ProcessBuilder getProcessBuilder(String command) {
        // TODO Auto-generated method stub
        String operSys = System.getProperty("os.name").toLowerCase();
        String shell = null;
        String executeCommandOption = null;

        if (operSys.contains("win")) {
            shell = configurations.getProperty(Constants.WINDOWS_SHELL);
            executeCommandOption = configurations
                    .getProperty(Constants.WINDOWS_EXEC_OPTION);
        } else if (operSys.contains("nix") || operSys.contains("nux")
                || operSys.contains("aix")) {
            shell = configurations.getProperty(Constants.LINUX_SHELL_1);
            executeCommandOption = configurations
                    .getProperty(Constants.LINUX_EXEC_OPTION_1);
        } else if (operSys.contains("mac")) {

        } else if (operSys.contains("sunos")) {

        }

        ProcessBuilder processBuilder = new ProcessBuilder(shell,
                executeCommandOption, command);

        processBuilder.redirectErrorStream(true);
        processBuilder.inheritIO();

        return processBuilder;
    }

    public boolean isRemoteExecution() {
        return isRemoteExecution;
    }

    public void setRemoteExecution(boolean isRemoteExecution) {
        this.isRemoteExecution = isRemoteExecution;
    }

    public String getCommand() {
        return command;
    }

    public void setCommand(String command) {
        this.command = command;
    }

    @Override
    public String toString() {
        return "CommandRunner [commandReferenceName=" + commandReferenceName
                + "]";
    }

}

The main class that submits threads to the executor :

public class CommandExecutor {

    private static final Logger logger = LogManager
            .getLogger(CommandExecutor.class);

    private static final int argsCount = 2;

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        if (args == null || args.length < argsCount) {
            logger.error("Please provide params as follows : \n <config.-properties-file-name> : The absolute properties file path where the config. is stored \n <commands-file-name> : The absolute file path where the commands are stored with one command per line");
            return;
        }

        new CommandExecutor().executeCommandsInParallel(args[0], args[1]);
    }

    private void executeCommandsInParallel(
            String configPropertiesFileAbsolutePath,
            String commandFileAbsolutePath) {

        /* MethodCall : Validate paths */

        /* MethodCall : Load configurations */
        Properties configurations = null;
        try {
            configurations = getConfiguration(configPropertiesFileAbsolutePath);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        /* MethodCall : Read the commands from the file in a Collection<String> */
        Collection<String> commands = null;
        try {
            commands = getCommands(commandFileAbsolutePath);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        /* Start reading the commands and scheduling them */
        if (commands != null && !commands.isEmpty()) {

            ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
                    Integer.valueOf(configurations
                            .getProperty(Constants.SQOOP_IMPORT_CONCURRENCY_FACTOR)));
            threadPoolExecutor
                    .setMaximumPoolSize(Integer.valueOf(configurations
                            .getProperty(Constants.SQOOP_IMPORT_CONCURRENCY_FACTOR)));

            Collection<Future> submittedCommandsFuture = new ArrayList<Future>();

            for (String commandString : commands) {

                String commandStringArray[] = commandString
                        .split(configurations
                                .getProperty(Constants.SPLIT_COMMAND_NAME_BY));

                CommandRunner commandRunner = new CommandRunner(
                        commandStringArray[1], Boolean.valueOf(configurations
                                .getProperty(Constants.IS_REMOTE_EXECUTION)),
                        configurations, commandStringArray[0]);

                logger.debug("Submitting " + commandStringArray[0]
                        + " for execution");

                Future commandRunnerFuture = threadPoolExecutor
                        .submit(commandRunner);
                submittedCommandsFuture.add(commandRunnerFuture);

                //printThreadPoolStats(threadPoolExecutor);
            }

        } else {
            System.out.println("No commands found, returning !");
            return;
        }

    }

    private Properties getConfiguration(String configPropertiesFileAbsolutePath)
            throws IOException {

        Properties configuration = new Properties();

        configuration.load(Files.newInputStream(FileSystems.getDefault()
                .getPath(configPropertiesFileAbsolutePath)));

        return configuration;

    }

    private Collection<String> getCommands(String commandFileAbsolutePath)
            throws IOException {

        List<String> commands = null;

        Path commandFilePath = FileSystems.getDefault().getPath(
                commandFileAbsolutePath);
        commands = Files.readAllLines(commandFilePath);

        return commands;
    }

    private void printThreadPoolStats(ThreadPoolExecutor poolExecutor) {
        logger.debug("Active tasks : " + poolExecutor.getActiveCount()
                + "Completed tasks : " + poolExecutor.getCompletedTaskCount()
                + "Pool size : " + poolExecutor.getPoolSize()
                + "Scheduled Tasks : " + poolExecutor.getTaskCount());
    }

    static class CustomThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        CustomThreadFactory(String factoryName) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
                    .getThreadGroup();
            namePrefix = factoryName + poolNumber.getAndIncrement()
                    + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix
                    + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

The output of the un-terminated program(from Eclipse console, in Windows Task Manager, I see 10 cmd processes), the Eclipse project path is E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad

[DEBUG] 2016-08-22 16:55:37.176 [main] CommandExecutor - Submitting "testthread1" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread2" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread3" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread4" for execution
[DEBUG] 2016-08-22 16:55:37.178 [pool-2-thread-1] CommandRunner - Starting "testthread1"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread5" for execution
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-3] CommandRunner - Starting "testthread3"
[DEBUG] 2016-08-22 16:55:37.178 [pool-2-thread-2] CommandRunner - Starting "testthread2"
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-4] CommandRunner - Starting "testthread4"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread6" for execution
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread7" for execution
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread8" for execution
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-5] CommandRunner - Starting "testthread5"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread9" for execution
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - finally block for thread "testthread4"
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - Starting "testthread6"
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - finally block for thread "testthread5"
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - Starting "testthread7"
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - finally block for thread "testthread3"
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - Starting "testthread8"
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - finally block for thread "testthread1"
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - Starting "testthread9"
[DEBUG] 2016-08-22 16:55:37.187 [pool-2-thread-2] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.187 [pool-2-thread-2] CommandRunner - finally block for thread "testthread2"
[DEBUG] 2016-08-22 16:55:37.189 [pool-2-thread-1] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.189 [pool-2-thread-1] CommandRunner - finally block for thread "testthread9"
[DEBUG] 2016-08-22 16:55:37.191 [pool-2-thread-3] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.191 [pool-2-thread-3] CommandRunner - finally block for thread "testthread8"
[DEBUG] 2016-08-22 16:55:37.192 [pool-2-thread-5] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.193 [pool-2-thread-5] CommandRunner - finally block for thread "testthread7"
[DEBUG] 2016-08-22 16:55:37.194 [pool-2-thread-4] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.194 [pool-2-thread-4] CommandRunner - finally block for thread "testthread6"

E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
Community
  • 1
  • 1
Kaliyug Antagonist
  • 3,512
  • 9
  • 51
  • 103
  • You have to call Procces.destroy() to kill it. For example, you can register shutdown hook which will do it. – Daniil Dubrovsky Aug 22 '16 at 15:03
  • I don't see command parameters for ProcessBuilder. For example, `mkdir` should be `new ProcessBuilder("mkdir", "foo");` to create directory `foo` and not `new ProcessBuilder("mkdir foo");`. It's important because the result of this would something like `command mkdir foo not found`. – Mickael Aug 22 '16 at 15:11
  • Please create a [MCVE](http://stackoverflow.com/help/mcve), there is too much code to debug easily. Also we don't see the actual command you send to the process builder which is very important to know the problem. – Codebender Aug 22 '16 at 15:14
  • 2
    `processBuilder.inheritIO();` implies that the subprocess will use `stdin` and `stdout` from the parent process rather than communicating via pipes. If you use that, it makes no sense to access the streams from the created `Process`. – Holger Aug 22 '16 at 15:17
  • @Holger Yeah I know, I just added that piece of code after referring the existing threads, however, it doesn't seem to make a difference – Kaliyug Antagonist Aug 23 '16 at 07:37
  • @DaniilDubrovsky At which point do I need to do it ? I tried it in the try as well as finally block, in vain – Kaliyug Antagonist Aug 23 '16 at 07:37
  • @MickaëlB Yes that's what the syntax says but the commands(one example provided in the post) are executing correctly, also, the output is as expected. It's just that the Process doesn't terminate – Kaliyug Antagonist Aug 23 '16 at 07:38
  • @Codebender You are right - even I was apprehensive to post the whole code, the only reason I did it was that someone may point out a mistake. I will edit the original post to include a short and sweet sample code :) – Kaliyug Antagonist Aug 23 '16 at 07:38
  • @KaliyugAntagonist you should store list of processes started by your program and create shutdown hook like this: `Runtime.getRuntime().addShutdownHook(new Thread(() -> processes.forEach(it -> it.destroy())))`. You can store this list in the static context and dynamically update when new processs are started – Daniil Dubrovsky Aug 23 '16 at 11:56
  • @DaniilDubrovsky The ScheduledThreadPoolExecutor starts the 5 threads(CommandRunner.java instances) which in turn create one ProcessBuilder and Process object each, respectively. I haven't used the Runtime class anywhere, can you elaborate where should I place this code and do I need to use Runtime only ? – Kaliyug Antagonist Aug 23 '16 at 12:07

0 Answers0