I'm writing multithread application which uses Phaser to know when to finish work. The problem is that in ExecutorCompletionService there can be even 100k of Threads in a queue, but maximum number of inarrived parties in Phaser is 65535. What can I do, when there arrives 65536 party?
My example code:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService ec = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
ec);
Phaser phaser = new Phaser();
// register first node/thread
ecs.submit(new SimpleParser("startfile.txt"));
phaser.register();
Future<List<String>> future;
do {
future = ecs.poll();
if(future!=null && future.get() != null) {
addParties(phaser, future.get(), ecs);
phaser.arriveAndDeregister();
}
if (phaser.isTerminated()) {
ec.shutdown();
}
} while (!ec.isShutdown() && !phaser.isTerminated());
}
public static void addParties(Phaser p, List<String> filenames,
ExecutorCompletionService<List<String>> ecs) {
for (int i = 0; i < filenames.size(); i++) {
ecs.submit(new SimpleParser(filenames.get(i)));
//PROBLEM = What to do when Phaser has 65535+ unarrived parties
p.register();
}
}
static class SimpleParser implements Callable<List<String>> {
String fileName;
public SimpleParser(String fileName) {
this.fileName = fileName;
}
@Override
public List<String> call() throws Exception {
return parseFile();
}
private List<String> parseFile() {
return new ArrayList<String>(Arrays.asList(new String[] {
"somefilename1.txt", "somefilename2.txt" }));
}
}
}
The problem is in addParties() method. Single thread(SimpleParser) can return i.e. 100 new file names and there will be 100 new Threads submitted to ExecutorCompletionService, and 100 new parties registered in Phaser. I have tried to use something like this:
if(p.getUnarrivedParties() == 65535)
p = new Phaser(p);
and create a chain of phasers, but it didn't help, because p.getUnarrivedParties() returns 0, but I can't register next party to it...
System.out.println(p.getUnarrivedParties());
if(p.getUnarrivedParties() == 65535) {
p = new Phaser(p);
System.out.println(p.getUnarrivedParties());
}
p.register();
prints:
65535
0
and throws IllegalStateException
So how can I create the new Phaser which would be connected with this old one?
//edit
Thank you @bowmore. I have just two more questions.
Let's look at the example:
import java.util.concurrent.Phaser;
public class Test2 {
public static void main(String[] args) {
Phaser parent = new Phaser();
Phaser child1 = new Phaser(parent);
Phaser child2 = new Phaser(parent);
child1.register();
child2.register();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child1.isTerminated()+"\n");
child1.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
child2.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
}
}
It prints:
Parent: false
Child1: false
Child2: false
Parent: false
Child1: false
Child2: false
Parent: true
Child1: true
Child2: true
Why after child1.arriveAndDeregister(); child1 is not terminated and how to check if it actually is?
Second question. I asked about creating new Phaser after reaching 65535 parties, because I thought it's useless to create thousands of new Objects - do you think there won't be memory problems with that, or maybe it can even impreove the performance?