0

I have a task to execute some collection-related logic in a parallel threads and compare it with the single thread mode. From this question multithreading to read a file in Java I've noticed that File reading is not a task for multi-threading, so I decided to focus on the further logic. The logic is the following:

  public List<?> taskExecution(File file, boolean parallel) {
    List<Entry<String, Integer>> entryList = new ArrayList<>();
    try {
        if (parallel) {
            entryList = taskExecutionInParallel(file);
        } else {
            // put in the map the words and their occurrence 
            Map<String, Integer> wordsFrequency = new HashMap<>();
            for(String word : this.readWordsFromText(file, parallel)) {
                if (wordsFrequency.containsKey(word)) {
                    wordsFrequency.put(word, wordsFrequency.get(word).intValue() + 1);
                } else {
                    wordsFrequency.put(word, 1);
                }
            }

            // create the list of Map.Entry objects
            entryList.addAll(wordsFrequency.entrySet());

            // sort the entries by the value descending
            Collections.sort(entryList, new Comparator<Entry<String, Integer>>(){

                @Override
                public int compare(Entry<String, Integer> o1,
                        Entry<String, Integer> o2) {
                    return o2.getValue().compareTo(o1.getValue());
                }

            });

            // identify the top index
            int topIndex = entryList.size() > 1 ? 2 : entryList.size() > 0 ? 1 : 0;

            // truncate the list
            entryList = entryList.subList(0, topIndex);

            // sort the result list by the words descending
            Collections.sort(entryList, new Comparator<Entry<String, Integer>>(){

                @Override
                public int compare(Entry<String, Integer> o1,
                        Entry<String, Integer> o2) {
                    return o2.getKey().compareTo(o1.getKey());
                }

            });
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return entryList;
}

I'm trying to execute the transformation from the initial words list to the map with words' frequencies with Fork/Join framework:

class ForkJoinFrequencyReader extends RecursiveAction {

    static final int SEQUENTIAL_THRESHOLD = 1000;

    private static final long serialVersionUID = -7784403215745552735L;
    private Map<String, Integer> wordsFrequency;
    private final int start;
    private final int end;
    private final List<String> words;

    public ForkJoinFrequencyReader(List<String> words, Map<String, Integer> wordsFrequency) {
        this(words, 0, words.size(), wordsFrequency);
    }

    private ForkJoinFrequencyReader(List<String> words, int start, int end, Map<String, Integer> wordsFrequency) {
        this.words = words;
        this.start = start;
        this.end = end;
        this.wordsFrequency = wordsFrequency;
    }

    private synchronized void putInMap() {
        for(int i = start; i < end; i++) {
            String word = words.get(i);
            if (wordsFrequency.containsKey(word)) {
                wordsFrequency.put(word, wordsFrequency.get(word).intValue() + 1);
            } else {
                wordsFrequency.put(word, 1);
            }
        }
    }

    @Override
    protected void compute() {
        if (end - start < SEQUENTIAL_THRESHOLD) {
            putInMap();
        } else {
            int mid = (start + end) >>> 1;
            ForkJoinFrequencyReader left = new ForkJoinFrequencyReader(words, start, mid, wordsFrequency);
            ForkJoinFrequencyReader right = new ForkJoinFrequencyReader(words, mid, end, wordsFrequency);
            left.fork();
            right.fork();
            left.join();
            right.join();
        }
    }

}

private List<Entry<String, Integer>> taskExecutionInParallel(File file) throws IOException {
    List<Entry<String, Integer>> entryList = new CopyOnWriteArrayList<>();

    ForkJoinPool pool = new ForkJoinPool();
    Map<String, Integer> wordsFrequency = new ConcurrentHashMap<>();
    pool.invoke(new ForkJoinFrequencyReader(Collections.synchronizedList(this.readWordsFromText(file, true)), wordsFrequency));

 //****** .... the same single-thread code yet
}

But, the resulted map has a different values after the each execution. Could someone point me where is the bottleneck or propose some other solutions to embed concurrency using standard JDK up to the version 7?

Community
  • 1
  • 1
kolya_metallist
  • 589
  • 9
  • 20

3 Answers3

0

You should probably use the parallel exeuction capabilities of Java 8 streams:

Path path = FileSystems.getDefault().getPath(...);
Stream<String> words = Files.lines(path);
Map<String, Long> wordsFrequency = words.parallel()
    .collect(Collectors.groupingBy(UnaryOperator.identity(),
                                   Collectors.counting()));
isnot2bad
  • 24,105
  • 2
  • 29
  • 50
  • thanks, but as I mentioned before I needed a solution in Java 7, the solution using Java 8 Streams API is another task and I've already done it. It looks like: – kolya_metallist Mar 29 '15 at 18:10
  • `list = Collections.synchronizedList(this.readWordsFromText(file, parallel)).parallelStream().collect(groupingByConcurrent(e -> e, counting())).entrySet().parallelStream().sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())).limit(2).sorted((e1, e2) -> e2.getKey().compareTo(e1.getKey())).collect(toList());` – kolya_metallist Mar 29 '15 at 18:34
0

Your putInMap is synchronized on concrete ForkJoinFrequencyReader instance. In the same time you create different instances of ForkJoinFrequencyReader in compute method. So your synchronization simply doesn't work because each one is related to it's own instance. To check it just replace your putInMap on

private void putInMap() {
    synchronized (wordsFrequency) {

Read this one for example: http://www.cs.umd.edu/class/fall2013/cmsc433/examples/wordcount/WordCountParallel.java

rygel
  • 48
  • 5
  • Thanks, your remark helped me. It works now. I'm also thinking that considering a BlockingQueue [link](http://www.ibm.com/developerworks/library/j-5things4/) could be a solution for this task. – kolya_metallist Mar 30 '15 at 14:35
0

I've implemented also the Producer-Consumer Pattern for the word frequency block:

private Map<String, Integer> frequencyCounterInParallel(File file) throws InterruptedException {
    Map<String, Integer> wordsFrequency = Collections.synchronizedMap(new LinkedHashMap<>());
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
    Thread producer = new Thread(new Producer(queue, file));
    Thread consumer = new Thread(new Consumer(queue, wordsFrequency));
    producer.start();
    consumer.start();
    producer.join();
    consumer.join();
    return wordsFrequency;
}

class Producer implements Runnable {

    private BlockingQueue<String> queue;
    private File file;

    public Producer(BlockingQueue<String> queue, File file) {
        this.file = file;
        this.queue = queue;
    }

    @Override
    public void run() { 
        try(BufferedReader bufferReader = Files.newBufferedReader(file.toPath())) {
            String line = null;
            while ((line = bufferReader.readLine()) != null){
                String[] lineWords = line.split(CommonConstants.SPLIT_TEXT_REGEX); 
                for(String word : lineWords) {
                    if (word.length() > 0) {
                        queue.put(word.toLowerCase());
                    }
                }
            }
            queue.put(STOP_THREAD);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {


    private BlockingQueue<String> queue;
    private Map<String, Integer> wordsFrequency;

    public Consumer(BlockingQueue<String> queue, Map<String, Integer> wordsFrequency) {
        this.queue = queue;
        this.wordsFrequency = wordsFrequency;
    }

    @Override
    public void run() {
        try {
            String word = null;
            while(!((word = queue.take()).equals(STOP_THREAD))) {
                if (wordsFrequency.containsKey(word)) {
                    wordsFrequency.put(word, wordsFrequency.get(word).intValue() + 1);
                } else {
                    wordsFrequency.put(word, 1);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }   
    }

}
kolya_metallist
  • 589
  • 9
  • 20