I have tried all the options provided in the existing threads like this, this and this.
I will have 100s of resource-intensive commands and each of them will run ranging from few minutes to 4-5 hours. I wish to have max 5 concurrent threads that will serve these commands.
Right now, I am trying out the code on just 10 mkdir commands on Windows. The output is proper and I don't get any exceptions, however, the 10 processess are not terminating !
one command for example :
"testthread1"~mkdir "E:\\Omkar\\Development\\Files\\java\\testthread1"
The Runnable. Note that process.waitFor() is commented in the finally block(I tried it in the try block just after start() as well but same issue), if I use it, only 5 directories are created :
public class CommandRunner implements Runnable {
private static final Logger logger = LogManager
.getLogger(CommandRunner.class);
private String command;
private boolean isRemoteExecution = false;
private Properties configurations;
private String commandReferenceName;
public CommandRunner(String command, boolean isRemoteExecution,
Properties configurations, String commandReferenceName) {
super();
this.command = command;
this.isRemoteExecution = isRemoteExecution;
this.configurations = configurations;
this.commandReferenceName = commandReferenceName;
}
@Override
public void run() {
// TODO Auto-generated method stub
if (isRemoteExecution) {
executeCommandRemotely();
} else {
executeCommandLocally();
}
}
private void executeCommandRemotely() {
}
private void executeCommandLocally() {
ProcessBuilder processBuilder = getProcessBuilder(command);
Process process = null;
try {
logger.debug("Starting " + commandReferenceName);
process = processBuilder.start();
// int errCode = process.waitFor();
//
// logger.debug("Executed ", this, " errors ? ", (errCode == 0 ?
// "No"
// : "Yes"));
//
printInputStreamTraditionalWay(process.getInputStream());
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("IOException", e);
} finally {
logger.debug("finally block for thread " + commandReferenceName);
try {
//process.waitFor();
process.getInputStream().close();
process.getOutputStream().flush();
process.getOutputStream().close();
process.getErrorStream().close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("IOException in finally block", e);
}
}
}
private void printInputStreamTraditionalWay(InputStream inputStream) {
byte[] byteArray = new byte[1024];
try {
while (inputStream.available() != 0) {
inputStream.read(byteArray);
logger.debug(new String(byteArray));
}
logger.debug("Input stream consumed, returning !");
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(
"IOException while reading Process input stream TraditionalWay!",
e);
} finally {
try {
inputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(
"IOException while closing Process input stream TraditionalWay!",
e);
}
}
}
private void printInputStreamBufferedWay(InputStream inputStream) {
// TODO Auto-generated method stub
BufferedReader br = null;
br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
try {
while ((line = br.readLine()) != null) {
logger.debug(line + System.getProperty("line.separator"));
}
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(
"IOException while reading Process input stream BufferedWay!",
e);
} finally {
try {
br.close();
inputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(
"IOException while closing Process input stream BufferedWay!",
e);
}
}
}
private ProcessBuilder getProcessBuilder(String command) {
// TODO Auto-generated method stub
String operSys = System.getProperty("os.name").toLowerCase();
String shell = null;
String executeCommandOption = null;
if (operSys.contains("win")) {
shell = configurations.getProperty(Constants.WINDOWS_SHELL);
executeCommandOption = configurations
.getProperty(Constants.WINDOWS_EXEC_OPTION);
} else if (operSys.contains("nix") || operSys.contains("nux")
|| operSys.contains("aix")) {
shell = configurations.getProperty(Constants.LINUX_SHELL_1);
executeCommandOption = configurations
.getProperty(Constants.LINUX_EXEC_OPTION_1);
} else if (operSys.contains("mac")) {
} else if (operSys.contains("sunos")) {
}
ProcessBuilder processBuilder = new ProcessBuilder(shell,
executeCommandOption, command);
processBuilder.redirectErrorStream(true);
processBuilder.inheritIO();
return processBuilder;
}
public boolean isRemoteExecution() {
return isRemoteExecution;
}
public void setRemoteExecution(boolean isRemoteExecution) {
this.isRemoteExecution = isRemoteExecution;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
@Override
public String toString() {
return "CommandRunner [commandReferenceName=" + commandReferenceName
+ "]";
}
}
The main class that submits threads to the executor :
public class CommandExecutor {
private static final Logger logger = LogManager
.getLogger(CommandExecutor.class);
private static final int argsCount = 2;
public static void main(String[] args) {
// TODO Auto-generated method stub
if (args == null || args.length < argsCount) {
logger.error("Please provide params as follows : \n <config.-properties-file-name> : The absolute properties file path where the config. is stored \n <commands-file-name> : The absolute file path where the commands are stored with one command per line");
return;
}
new CommandExecutor().executeCommandsInParallel(args[0], args[1]);
}
private void executeCommandsInParallel(
String configPropertiesFileAbsolutePath,
String commandFileAbsolutePath) {
/* MethodCall : Validate paths */
/* MethodCall : Load configurations */
Properties configurations = null;
try {
configurations = getConfiguration(configPropertiesFileAbsolutePath);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/* MethodCall : Read the commands from the file in a Collection<String> */
Collection<String> commands = null;
try {
commands = getCommands(commandFileAbsolutePath);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/* Start reading the commands and scheduling them */
if (commands != null && !commands.isEmpty()) {
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
Integer.valueOf(configurations
.getProperty(Constants.SQOOP_IMPORT_CONCURRENCY_FACTOR)));
threadPoolExecutor
.setMaximumPoolSize(Integer.valueOf(configurations
.getProperty(Constants.SQOOP_IMPORT_CONCURRENCY_FACTOR)));
Collection<Future> submittedCommandsFuture = new ArrayList<Future>();
for (String commandString : commands) {
String commandStringArray[] = commandString
.split(configurations
.getProperty(Constants.SPLIT_COMMAND_NAME_BY));
CommandRunner commandRunner = new CommandRunner(
commandStringArray[1], Boolean.valueOf(configurations
.getProperty(Constants.IS_REMOTE_EXECUTION)),
configurations, commandStringArray[0]);
logger.debug("Submitting " + commandStringArray[0]
+ " for execution");
Future commandRunnerFuture = threadPoolExecutor
.submit(commandRunner);
submittedCommandsFuture.add(commandRunnerFuture);
//printThreadPoolStats(threadPoolExecutor);
}
} else {
System.out.println("No commands found, returning !");
return;
}
}
private Properties getConfiguration(String configPropertiesFileAbsolutePath)
throws IOException {
Properties configuration = new Properties();
configuration.load(Files.newInputStream(FileSystems.getDefault()
.getPath(configPropertiesFileAbsolutePath)));
return configuration;
}
private Collection<String> getCommands(String commandFileAbsolutePath)
throws IOException {
List<String> commands = null;
Path commandFilePath = FileSystems.getDefault().getPath(
commandFileAbsolutePath);
commands = Files.readAllLines(commandFilePath);
return commands;
}
private void printThreadPoolStats(ThreadPoolExecutor poolExecutor) {
logger.debug("Active tasks : " + poolExecutor.getActiveCount()
+ "Completed tasks : " + poolExecutor.getCompletedTaskCount()
+ "Pool size : " + poolExecutor.getPoolSize()
+ "Scheduled Tasks : " + poolExecutor.getTaskCount());
}
static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String factoryName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
namePrefix = factoryName + poolNumber.getAndIncrement()
+ "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix
+ threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
The output of the un-terminated program(from Eclipse console, in Windows Task Manager, I see 10 cmd processes), the Eclipse project path is E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad
[DEBUG] 2016-08-22 16:55:37.176 [main] CommandExecutor - Submitting "testthread1" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread2" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread3" for execution
[DEBUG] 2016-08-22 16:55:37.178 [main] CommandExecutor - Submitting "testthread4" for execution
[DEBUG] 2016-08-22 16:55:37.178 [pool-2-thread-1] CommandRunner - Starting "testthread1"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread5" for execution
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-3] CommandRunner - Starting "testthread3"
[DEBUG] 2016-08-22 16:55:37.178 [pool-2-thread-2] CommandRunner - Starting "testthread2"
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-4] CommandRunner - Starting "testthread4"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread6" for execution
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread7" for execution
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread8" for execution
[DEBUG] 2016-08-22 16:55:37.179 [pool-2-thread-5] CommandRunner - Starting "testthread5"
[DEBUG] 2016-08-22 16:55:37.179 [main] CommandExecutor - Submitting "testthread9" for execution
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - finally block for thread "testthread4"
[DEBUG] 2016-08-22 16:55:37.181 [pool-2-thread-4] CommandRunner - Starting "testthread6"
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - finally block for thread "testthread5"
[DEBUG] 2016-08-22 16:55:37.182 [pool-2-thread-5] CommandRunner - Starting "testthread7"
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - finally block for thread "testthread3"
[DEBUG] 2016-08-22 16:55:37.184 [pool-2-thread-3] CommandRunner - Starting "testthread8"
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - finally block for thread "testthread1"
[DEBUG] 2016-08-22 16:55:37.186 [pool-2-thread-1] CommandRunner - Starting "testthread9"
[DEBUG] 2016-08-22 16:55:37.187 [pool-2-thread-2] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.187 [pool-2-thread-2] CommandRunner - finally block for thread "testthread2"
[DEBUG] 2016-08-22 16:55:37.189 [pool-2-thread-1] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.189 [pool-2-thread-1] CommandRunner - finally block for thread "testthread9"
[DEBUG] 2016-08-22 16:55:37.191 [pool-2-thread-3] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.191 [pool-2-thread-3] CommandRunner - finally block for thread "testthread8"
[DEBUG] 2016-08-22 16:55:37.192 [pool-2-thread-5] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.193 [pool-2-thread-5] CommandRunner - finally block for thread "testthread7"
[DEBUG] 2016-08-22 16:55:37.194 [pool-2-thread-4] CommandRunner - Input stream consumed, returning !
[DEBUG] 2016-08-22 16:55:37.194 [pool-2-thread-4] CommandRunner - finally block for thread "testthread6"
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>
E:\Omkar\Development\Directories\Workspaces\Tech\Homework\16032016\Scratchpad>