If you do not have any hard requirement on IgniteFuture
(which I believe should not be the case) and all you want is some mechanism to put data into a cache and get it processed asynchronously, and then process the results returned by that operation, then you can use Java's Executor Service.
If you are not much aware of Java's executor service then you may want to read documentation or this answer which highlights quick points, in below example I have also added comments.
Below are few other quick points about number of threads:
- This example creates a
ExecutorService
with "ThreadPoolExecutor
" implementation, there are other options like "Executors.newSingleThreadExecutor()
" and "Executors.newFixedThreadPool(10)
" which lets you define how many threads you want in the JVM.
- You can also choose to directly create object of
ThreadPoolExecutor
like this return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
and you can have more control on how many threads and type of queue. You will need to read more on this if you want to create ThreadPoolExecutor
object yourself and not through ExecutorService
Few other points related to your implementations:
- When you are processing the
Future
objects then it is blocking process, because Future.get()
is a blocking call, so your main thread will be blocked until all Future objects are returned and processed.
- If you do not want blocking then there are several options like you can create a new thread to do all this processing and hence freeing your main thread. Another option could be that do not processing Future objects, so as soon you do
executorService.submit(callableTask1)
, you thread will be free, and then in your CallableTask
object you can push the result in a queue (you can choose Java's queue implementation as per your need) and then process that queue from other thread. Yet another option could be to dedicate another thread for processing your Future objects.
Sample code:
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceFutureCallableExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Future<String>> futuresList = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorServiceFutureCallableExample.CallableTask callableTask1 = new ExecutorServiceFutureCallableExample.CallableTask(2000);
ExecutorServiceFutureCallableExample.CallableTask callableTask2 = new ExecutorServiceFutureCallableExample.CallableTask(1000);
ExecutorServiceFutureCallableExample.CallableTask callableTask3 = new ExecutorServiceFutureCallableExample.CallableTask(3000);
System.out.println("### Starting submitting tasks");
// submit the callable and register the returned future object so that it can be processed later.
futuresList.add(executorService.submit(callableTask1));
futuresList.add(executorService.submit(callableTask2));
futuresList.add(executorService.submit(callableTask3));
System.out.println("### Finished submitting tasks");
for (int i = 0; i < futuresList.size(); i++) {
// here "get()" waits for the future tasks to be returned.
System.out.println(futuresList.get(i).get());
}
System.out.println("### Finished.");
}
static class CallableTask implements Callable<String>{
private long timeToSleep;
CallableTask(long _timeToSleep){
this.timeToSleep = _timeToSleep;
}
@Override
public String call() throws Exception {
String str = new Date() + ": Processing - " + this.hashCode() + " | " + Thread.currentThread() + ", slept for seconds - " + timeToSleep;
System.out.println(str);
Thread.sleep(timeToSleep);
return str + " ||||| completed at: " + new Date();
}
public long getTimeToSleep() {
return timeToSleep;
}
public void setTimeToSleep(long timeToSleep) {
this.timeToSleep = timeToSleep;
}
}
}