1

I read from here that there are several different thread-safe options of Set. In my application, I have 10 threads concurrently adding things to one collection (does not have to be set, but better). After all threads finishes, I need to iterate through the collection.

I read that ConcurrentSkipListSet and Collections.newSetFromMap(new ConcurrentHashMap()) both have inconsistent batch operations (addAll, removeAll, etc.) and Iterators. My experiment also confirms this. When I use ConcurrentSkipListSet, after adding by all the threads, the reading is a bit random. I get randomly different size of the set.

I then tried Collections.synchronizedSet(new HashSet<>()), which I suppose should be thread safe as it is blocking multiple write access at the same time. Howvever, it seems it has the same inconsistent reading issue. I still randomly get different sizes in the resulting set.

What should I do to make sure the reading is consistent? As said I do not have to use Set. I can use List, or others, as long as there is a way to avoid duplicate adding

It's abit difficult to show the code as it is part of a very large package. But in general it looks like this

public class MyRecursiveTask extends RecursiveTask<Integer> {
    private List<String> tasks; 
    protected ConcurrentSkipListSet<String> dictionary;
    public MyRecursiveTask(ConcurrentSkipListSet<String> dictionary,
                           List<String> tasks){
        this.dictionary=dictionary;
        this.tasks=tasks;
    }

    protected Integer compute() {
        if (this.tasks.size() > 100) {
            List<RecursiveFeatureExtractor> subtasks =
                new ArrayList<>();
            subtasks.addAll(createSubtasks());
            int count=0;
            for (MyRecursiveTask subtask : subtasks)
                subtask.fork();
            for (MyRecursiveTask subtask : subtasks)
                count+=subtask.join();
            return count;
        } else {
            int count=0;
            for (File task: tasks) {
                    // code to process task
                 String outcome = [method to do some task]
                 dictionary.add(outcome);
                 count++;
            }
            return count;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
            new ArrayList<>();

        int total = tasks.size() / 2;
        List<File> tasks1= new ArrayList<>();
        for (int i = 0; i < total; i++)
            tasks1.add(tasks.get(i));
        MyRecursiveTask subtask1 = new MyRecursiveTask(
            dictionary, tasks1);

        List<File> tasks2= new ArrayList<>();
        for (int i = total; i < tasks.size(); i++)
            tasks2.add(tasks.get(i));
        MyRecursiveTask subtask2 = new MyRecursiveTask(
            dictionary, tasks2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}

Then the code that creates a list of such threaded workers:

....
List<String> allTasks = new ArrayList<String>(100000);
....
//code to fill in "allTasks"
....

ConcurrentSkipListSet<String> dictionary = new ConcurrentSkipListSet<>();
//I also tried "dictionary = Collections.Collections.synchronizedSet(new 
//HashSet<>())" and changed other bits of code accordingly. 
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
MyRecursiveTask mrt = new MyRecursiveTask (dictionary,
            );
int total= forkJoinPool.invoke(mrt);
System.out.println(dictionary.size()); //this value is a bit random. If real     
//size should be 999, when I run the code once i may get 989; second i may 
//get 999; third I may get 990 etc....

thanks

Community
  • 1
  • 1
Ziqi
  • 2,445
  • 5
  • 38
  • 65
  • 3
    Please give us a piece of code that displays your problem, together with expected and actual output. – Keppil Aug 10 '15 at 20:05
  • You're probably confusing data race with a general race condition. Concurrent classes only save you from the former, but your code suffers from the latter. – Marko Topolnik Aug 10 '15 at 20:18
  • just added some code. I doubt the race condition. The only operation on the concurrent set is adding things to it, and there is no other checking on the elements inside the set. – Ziqi Aug 10 '15 at 20:29

1 Answers1

1

Without seeing the code, hard to tell what is wrong. I would guess that the thread, which reads the result runs too early while some threads are still writing. Use Thread.join to wait for writers. Collections.synchronizedSet is thread safe surely.

Consider this from the Javadoc:

It is imperative that the user manually synchronize on the returned set when iterating over it:

   Set s = Collections.synchronizedSet(new HashSet());
       ...   synchronized (s) {
       Iterator i = s.iterator(); // Must be in the synchronized block
       while (i.hasNext())
           foo(i.next());   }   

Failure to follow this advice may result in non-deterministic behavior. The returned set will be serializable if the specified set is serializable.

Gábor Lipták
  • 9,646
  • 2
  • 59
  • 113