I read from here that there are several different thread-safe options of Set. In my application, I have 10 threads concurrently adding things to one collection (does not have to be set, but better). After all threads finishes, I need to iterate through the collection.
I read that ConcurrentSkipListSet and Collections.newSetFromMap(new ConcurrentHashMap()) both have inconsistent batch operations (addAll, removeAll, etc.) and Iterators. My experiment also confirms this. When I use ConcurrentSkipListSet, after adding by all the threads, the reading is a bit random. I get randomly different size of the set.
I then tried Collections.synchronizedSet(new HashSet<>()), which I suppose should be thread safe as it is blocking multiple write access at the same time. Howvever, it seems it has the same inconsistent reading issue. I still randomly get different sizes in the resulting set.
What should I do to make sure the reading is consistent? As said I do not have to use Set. I can use List, or others, as long as there is a way to avoid duplicate adding
It's abit difficult to show the code as it is part of a very large package. But in general it looks like this
public class MyRecursiveTask extends RecursiveTask<Integer> {
private List<String> tasks;
protected ConcurrentSkipListSet<String> dictionary;
public MyRecursiveTask(ConcurrentSkipListSet<String> dictionary,
List<String> tasks){
this.dictionary=dictionary;
this.tasks=tasks;
}
protected Integer compute() {
if (this.tasks.size() > 100) {
List<RecursiveFeatureExtractor> subtasks =
new ArrayList<>();
subtasks.addAll(createSubtasks());
int count=0;
for (MyRecursiveTask subtask : subtasks)
subtask.fork();
for (MyRecursiveTask subtask : subtasks)
count+=subtask.join();
return count;
} else {
int count=0;
for (File task: tasks) {
// code to process task
String outcome = [method to do some task]
dictionary.add(outcome);
count++;
}
return count;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks =
new ArrayList<>();
int total = tasks.size() / 2;
List<File> tasks1= new ArrayList<>();
for (int i = 0; i < total; i++)
tasks1.add(tasks.get(i));
MyRecursiveTask subtask1 = new MyRecursiveTask(
dictionary, tasks1);
List<File> tasks2= new ArrayList<>();
for (int i = total; i < tasks.size(); i++)
tasks2.add(tasks.get(i));
MyRecursiveTask subtask2 = new MyRecursiveTask(
dictionary, tasks2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
Then the code that creates a list of such threaded workers:
....
List<String> allTasks = new ArrayList<String>(100000);
....
//code to fill in "allTasks"
....
ConcurrentSkipListSet<String> dictionary = new ConcurrentSkipListSet<>();
//I also tried "dictionary = Collections.Collections.synchronizedSet(new
//HashSet<>())" and changed other bits of code accordingly.
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
MyRecursiveTask mrt = new MyRecursiveTask (dictionary,
);
int total= forkJoinPool.invoke(mrt);
System.out.println(dictionary.size()); //this value is a bit random. If real
//size should be 999, when I run the code once i may get 989; second i may
//get 999; third I may get 990 etc....
thanks