I want to use the ExecutorService to run a series of the same Runnable/Callable tasks. I've searched around for a tutorial or an example, but nothing that involves actually setting the value of an existing Runnable/Callable object and then using submit() to send that object back into the ExecutorService.
Basically, here's what I want to do:
- Get a list of servers.
- Iterate thru the list of servers, calling
InetAddress.getByName(host)
to grab data on each host. - Collect that data into Server beans for storage in a database.
So, right now, with 10,000(+) servers, it takes forever. So, my thought was to use the ExecutorService to manage a pool of threads. What I can't seem to figure out is how to detect when one thread is finished so I can grab the data. Then I need to get the next server in the list, place it into the Task and then submit() back to the ExecutorService.
That said, what I've read so far seems to point to the following, ExecutorService, submit(), Callable, Future.
So, as a psuedo-process:
- Get list of servers.
- Set up ExecutorService with numThreads number of threads
- Iterate numThreads and create numThreads WorkerTask() objects.
- Submit() WorkerTask() to ExecutorService for processing.
- Detect when a WorkerTask() has finished, grab the Callable (Future) result.
- Get the next server.
- Set the server value into the WorkerTask() <-- How? This is elusive ...
- Submit() the WorkerTask() (with the new value) back to the ExecutorService.
- Iterate again.
- ExecutorService.shutdown()...
So, a tutorial or example of this would be great...especially an example of placing a new value into a WorkerTask(). Also, I'd appreciate any thoughts on this proposed solution? Is this bad, good, or if there is another way, I'm open.
02/09/2014 - Edit and Add
Hi, So the following is a first cut at this. But to answer some question being posed:
- I've solved the issue of placing new data in a Worker and resubmitting to the ExecutorService...see the code snippet.
- I've also solved the issue of "get the stuff"...I simply cast the Future() results to the Worker class...see the code snippet.
- Finally, while I could just allocate every server to a Worker() and a Future(), I'm concerned that the current 10,000 will grow and memory will become an issue.
That said, here's a first attempt, and this works pretty well. Runs much faster and only uses the getNumbnerThreads() Worker and Future objects:
public List<ServerBean> lookupHostIps ( List<ServerBean> theServerList ) {
//ServerBean serverDto = null;
ServerBean ipDto = null;
List<ServerBean> theResults = new ArrayList<ServerBean>();
List<HostLookupWorker> theWorkers = new ArrayList<HostLookupWorker>( getNumberThreads() );
List<Future<HostLookupWorker>> theFutures = new ArrayList<Future<HostLookupWorker>>( getNumberThreads() );
ExecutorService executor = Executors.newFixedThreadPool ( getNumberThreads() );
// WORKERS : Create the workers...prime them with a server
// bean...
//
for (int j = 0; j < getNumberThreads(); j++) {
//for (int j = 0; j < theServerList.size(); j++) {
theWorkers.add ( new HostLookupWorker( theServerList.get(j) ) );
Future<HostLookupWorker> theFuture = executor.submit ( theWorkers.get ( j ) );
theFutures.add ( j, theFuture );
}
int lloopItems = getNumberThreads(); /* loops thru all servers */
//int lloopThreads = 0; /* loops thru threads */
int lidxThread = 0; /* what thread is ready */
//int lidxFuture = 0; /* what future is ready */
boolean lblnNext = false; /* is a thread done/ready */
int lidxWorkers = 0; /* tracks the futures */
while ( lloopItems < theServerList.size() ) {
// READY : Is one of the threads ready for more work?
if ( lblnNext ) {
// VALUE : Grab the thread by index and set the next
// server value.
theWorkers.get ( lidxThread ).setBean ( theServerList.get(lloopItems) );
getLog().debug ( "Thread [" + lidxThread + "] Assigned Host ["+theServerList.get(lloopItems).getServerName ()+"] " );
// FUTURE : Package a new Future<HostLookupWorker>
// and submit it to the thread pool.
Future<HostLookupWorker> theFuture = executor.submit ( theWorkers.get ( lidxThread ) );
theFutures.add ( lidxThread, theFuture );
lblnNext = false; /* reset to allow for another thread */
lloopItems++; /* increment the main loop counter */
}
while ( !(lblnNext) ) {
try {
if ( theFutures.get(lidxWorkers).get() != null ) {
// GET THE STUFF : Grab the results from the Future...
HostLookupWorker ltheItem = theFutures.get(lidxWorkers).get();
if ( ltheItem.getValue () != null ) {
if (!ltheItem.getValue ().contains("Cannot find host")){
ipDto = new ServerBean ();
ipDto.setServerId ( ltheItem.getBean ().getServerId() );
ipDto.setServerName ( ltheItem.getBean ().getServerName() );
ipDto.setIpAddress ( ltheItem.getValue () );
theResults.add(ipDto);
}
lidxThread = lidxWorkers; /* this thread is ready for more work */
lblnNext = true; /* flag the upper condition to assign new work */
getLog().debug ( "Thread [" + lidxThread + "] Host ["+ltheItem.getHost ()+"] has IP ["+ltheItem.getValue()+"]" );
}
}
else {
getLog().debug ( "Thread [" + lidxThread + "] NULL" );
}
lidxWorkers++; /* next worker/future */
if ( lidxWorkers >= getNumberThreads() ) {
lidxWorkers = 0;
}
}
catch(ExecutionException e){
getLog().error ( e );
}
catch(InterruptedException e){
getLog().error ( e );
}
}
}
executor.shutdown ();
return theResults;
}
Here's the Worker/Thread class :
import java.net.*;
import java.util.concurrent.Callable;
import com.lmig.cdbatch.dto.ServerBean;
public class HostLookupWorker implements Callable {
private InetAddress node = null;
private String value = null;
private boolean busy = false;
private ServerBean bean = null;
public HostLookupWorker () {
this.busy = false;
}
// public HostLookupWorker ( String theHost ) {
// this.busy = false;
// this.host = theHost;
// }
public HostLookupWorker ( ServerBean theItem ) {
this.busy = false;
this.bean = theItem;
//this.host = theItem.getServerName ().trim ();
}
public String lookup ( String host ) {
if ( host != null ) {
// get the bytes of the IP address
try {
this.node = InetAddress.getByName ( host );
}
catch ( UnknownHostException ex ) {
this.value = "Not Found [" + getHost() + "]";
return "Not Found [" + host + "]";
}
if ( isHostname(host) ) {
getBean().setIpAddress ( node.getHostAddress() );
return node.getHostAddress();
}
else { // this is an IP address
//return node.getHostName();
return host;
}
}
return host;
} // end lookup
public boolean isHostname(String host) {
// Is this an IPv6 address?
if (host.indexOf(':') != -1)
return false;
char[] ca = host.toCharArray();
// if we see a character that is neither a digit nor a period
// then host is probably a hostname
for (int i = 0; i < ca.length; i++) {
if (!Character.isDigit(ca[i])) {
if (ca[i] != '.')
return true;
}
}
// Everything was either a digit or a period
// so host looks like an IPv4 address in dotted quad format
return false;
} // end isHostName
// public void run() {
// value = lookup ( getHost() );
//
// }
public Object call() throws Exception {
Thread.sleep ( 10000 );
this.busy = true;
this.value = lookup ( getHost() );
this.busy = false;
return this;
}
public String getHost() {
return getBean().getServerName ().trim ();
}
public void setHost(String host) {
getBean().setServerName ( host.trim () );
}
public InetAddress getNode() {
return node;
}
public void setNode(InetAddress node) {
this.node = node;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public boolean isBusy() {
return busy;
}
public void setBusy(boolean busy) {
this.busy = busy;
}
public ServerBean getBean() {
return bean;
}
public void setBean(ServerBean bean) {
this.bean = bean;
}
}
So, to summarize:
- The process does work, and works fast.
- I need to fix the code a little, as there are getNumberThreads() - 1 Futures left unprocessed when the larger while () loop finally finishes...
So, what I'm struggling with now is how to detect when a thread is finished...I've seen multiple examples, one testing for Future() != null, others testing for Future() == null. So which one is right?