1

I am trying to apply get faster output through threads. Just doing a small POC sort.
Suppose I have a problem statement to find all the the numbers in an array who have odd occurrence. Following is my attempt for both sequentially and parallel.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class Test1 {

    final static Map<Integer, Integer> mymap  = new HashMap<>();

    static Map<Integer, AtomicInteger> mymap1 = new ConcurrentHashMap<>();

    public static void generateData(final int[] arr) {
        final Random aRandom = new Random();
        for (int i = 0; i < arr.length; i++) {
            arr[i] = aRandom.nextInt(10);
        }
    }

    public static void calculateAllOddOccurrence(final int[] arr) {

        for (int i = 0; i < arr.length; i++) {
            if (mymap.containsKey(arr[i])) {
                mymap.put(arr[i], mymap.get(arr[i]) + 1);
            } else {
                mymap.put(arr[i], 1);
            }
        }

        for (final Map.Entry<Integer, Integer> entry : mymap.entrySet()) {
            if (entry.getValue() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceThread(final int[] arr) {

        final ExecutorService executor = Executors.newFixedThreadPool(10);
        final List<Future<?>> results = new ArrayList<>();
        ;
        final int range = arr.length / 10;
        for (int count = 0; count < 10; ++count) {
            final int startAt = count * range;
            final int endAt = startAt + range;
            executor.submit(() -> {
                for (int i = startAt; i < endAt; i++) {
                    if (mymap1.containsKey(arr[i])) {
                        final AtomicInteger accumulator = mymap1.get(arr[i]);
                        accumulator.incrementAndGet();
                        mymap1.put(arr[i], accumulator);
                    } else {
                        mymap1.put(arr[i], new AtomicInteger(1));
                    }
                }
            });
        }

        awaitTerminationAfterShutdown(executor);

        for (final Entry<Integer, AtomicInteger> entry : mymap1.entrySet()) {
            if (entry.getValue().get() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceStream(final int[] arr) {

        final ConcurrentMap<Integer, List<Integer>> map2 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i -> i));
        map2.entrySet().stream().parallel().filter(e -> e.getValue().size() % 2 != 0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

    }

    public static void awaitTerminationAfterShutdown(final ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (final InterruptedException ex) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(final String... doYourBest) {

        final int[] arr = new int[200000000];

        generateData(arr);
        long starttime = System.currentTimeMillis();
        calculateAllOddOccurrence(arr);

        System.out.println("Total time=" + (System.currentTimeMillis() - starttime));

        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(arr);

        System.out.println("Total time Thread=" + (System.currentTimeMillis() - starttime));

    }

}

Output:

1=20003685
2=20000961
3=19991311
5=20006433
7=19995737
8=19999463
Total time=3418
5=20006433
7=19995737
1=20003685
8=19999463
2=20000961
3=19991311
Total time Thread=19640

Parallel execution (calculateAllOddOccurrenceStream ) is taking more time. What is the best way to process an array in parallel and then merge the result?

My goal is not to find the fastest algorithm, but to use any algorithm and try to run on in different threads such that they are processing different part of array simultaneously.

Dhananjay
  • 1,140
  • 1
  • 12
  • 28
  • Have you considered the `Stream` API in Java 8+? – Thomas Timbul May 25 '18 at 09:56
  • Yes sure, not able to solve this via it. Can you help? – Dhananjay May 25 '18 at 09:58
  • If you show me your existing Streams API based code, I'll help you out. But as a hint to where you're going wrong, you're parallely overwriting the values in map1 within your loop. Using `ConcurrentHashMap` doesn't change the fact that "last put wins". – Thomas Timbul May 25 '18 at 10:07

3 Answers3

4

It seems that those threads are working on same parts of the array simultaneously hence the answer is not coming correctly.

Rather divide the array in parts with proper start and end indexes. Allocate separate threads to process these parts and count the occurences of each number in each of those parts.

At the end, you would have multiple maps having counts calculated from those separate parts. Merge those maps to get the final answer.

OR you could have a single concurrentHashMap for storing the counts coming from all those threads, but a bug could creep in there I guess as there would still be concurrent write conflicts. In a highly multi-threaded environment, writes on a cocnurrentHashMap might not be 100% safe. For a guaranteed write behaviour, the correct way is to use the the atomicity of ConcurrentHashMap.putIfAbsent(K key, V value) method and pay attention to the return value, which tells if the put operation was successful or not. Simple put might not be correct. See https://stackoverflow.com/a/14947844/945214

You could use java 8 streams API (https://www.journaldev.com/2774/java-8-stream) to write the code OR simple threading code using Java 5 constructs would also do.

Added Java8 stream code, Notice the timing differences. ArrayList (instead) of an array makes a difference:

package com.test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;

public class Test {

    public static void generateData(final int[] arr) {
        final Random aRandom = new Random();
        for (int i = 0; i < arr.length; i++) {
            arr[i] = aRandom.nextInt(10);
        }
    }

    public static void calculateAllOddOccurrence(final int[] arr) {
        final Map<Integer, Integer> mymap  = new HashMap<>();
        for (int i = 0; i < arr.length; i++) {
            if (mymap.containsKey(arr[i])) {
                mymap.put(arr[i], mymap.get(arr[i]) + 1);
            } else {
                mymap.put(arr[i], 1);
            }
        }
        for (final Map.Entry<Integer, Integer> entry : mymap.entrySet()) {
            if (entry.getValue() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }
    }

    public static void calculateAllOddOccurrenceStream( int[] arr) {
        Arrays.stream(arr).boxed().collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).entrySet().parallelStream().filter(e -> e.getValue() % 2 != 0).forEach(entry -> System.out.println(entry.getKey()+"="+ entry.getValue()));
    }

    public static void calculateAllOddOccurrenceStream(List<Integer> list) {
        list.parallelStream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).entrySet().parallelStream().filter(e -> e.getValue() % 2 != 0).forEach(entry -> System.out.println(entry.getKey()+"="+ entry.getValue()));
    }

    public static void main(final String... doYourBest) {

        final int[] arr = new int[200000000];

        generateData(arr);
        long starttime = System.currentTimeMillis();
        calculateAllOddOccurrence(arr);
        System.out.println("Total time with simple map=" + (System.currentTimeMillis() - starttime));

        List<Integer> list = Arrays.stream(arr).boxed().collect(Collectors.toList());
        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(list);
        System.out.println("Total time stream - with a readymade list, which might be the case for most apps as arraylist is more easier to work with =" + (System.currentTimeMillis() - starttime));

        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(arr);
        System.out.println("Total time Stream with array=" + (System.currentTimeMillis() - starttime));

    }}

OUTPUT


0=19999427
2=20001707
4=20002331
5=20001585
7=20001859
8=19993989
Total time with simple map=2813
4=20002331
0=19999427
2=20001707
7=20001859
8=19993989
5=20001585
Total time stream - with a readymade list, which might be the case for most apps as arraylist is more easier to work with = 3328
8=19993989
7=20001859
0=19999427
4=20002331
2=20001707
5=20001585
Total time Stream with array=6115
gargkshitiz
  • 2,130
  • 17
  • 19
  • "but a bug could creep in there I guess as there would be still concurrent write conflicts." - what? The point of `ConcurrentHashMap` is that it is thread-safe and can be written to by multiple threads at the same time. – Jaroslaw Pawlak May 25 '18 at 10:07
  • In a highly multi-threaded environment, writes on a cocnurrentHashMap might not be 100% safe. For a guaranteed write behaviour, the correct way is to use the the atomicity of ConcurrentHashMap.putIfAbsent(K key, V value) method and pay attention to the return value, which tells if the put operation was successful or not. Simple put might not be correct. See https://stackoverflow.com/a/14947844/945214 – gargkshitiz May 25 '18 at 10:13
  • The 'bug' is that a sequence of "get then put" is never atomic. It always requires external synchronization. – Thomas Timbul May 25 '18 at 10:14
  • Ok, good point. Would to nice to update the answer to be clearer on that. – Jaroslaw Pawlak May 25 '18 at 10:18
  • I tried with stream, solution provided by @thomas-timbul. But using streams and concurrent hash maps, is way slower than the imperitive way. Can you illustrate y? – Dhananjay May 28 '18 at 05:12
  • Done.. calculateAllOddOccurrence is an imperitive way that roughly takes 4 sec. calculateAllOddOccurrenceStream is the stream declarative way which takes around 19 sec. – Dhananjay May 28 '18 at 05:32
  • 1
    I have updated my answer, please upvote and accept if it helped. – gargkshitiz May 28 '18 at 12:06
  • Thanks.. For some inputs its actually faster. So there is a difference between Array.stream and collection stream? Do you know any link that goes in depth? Thanks again.. – Dhananjay May 28 '18 at 18:12
  • `calculateAllOddOccurrenceStream(int[])` has to box all the `int`s with `boxed()`. In `calculateAllOddOccurrenceStream(List)` you have already provided a list of `Integer`, so the conversion is not required - no surprise that it is faster, because it does less work. – Thomas Timbul May 29 '18 at 10:40
  • Yes, that is why I had said "ArrayList (instead) of an array makes a difference:". In most code bases, we tend to use arraylist as that is easier to work with. I wanted to demonstrate that if we have a list already, then it may be faster – gargkshitiz May 29 '18 at 10:43
  • @gargkshitiz No, it is not the fact that it is a list vs array that makes the difference. It is the fact that the conversion from `int` to `Integer` takes time. This time is included in the timing for `calculateAllOddOccurrenceStream(int[])`, but in `calculateAllOddOccurrenceStream(List)` no boxing is required. The boxing is performed within the main method. That isn't fair. Time has been spent to convert the int array into a form usable by that method, but doesn't count against that method? – Thomas Timbul May 29 '18 at 10:45
  • To recap on the performance question: 1) int + for loop can be compiler optimised and primitive access is blazingly fast. 2) Boxing int to Integer requires time. 3) If in one method you include the time to box, but in the other you don't, naturally the latter will appear faster. That's a flaw in the test itself. – Thomas Timbul May 29 '18 at 10:48
  • In this case, yes that time is intentionally left out. It is just to demonstrate to the OP that having a list beforehand can give an edge when working with streams. This might be a sample program . List might very well be the actual input instead of array in case of real business use cases. I am not trying to hack the question, instead I am just giving a perspective to the OP – gargkshitiz May 29 '18 at 10:49
  • Again, it's not the List. Have you tried with `Arrays.stream(Integer[])`? – Thomas Timbul May 29 '18 at 10:54
  • I have updated my answer to explicitly mention the timing nuances – gargkshitiz May 29 '18 at 11:24
1

You are looking at the STREAMS API introduced in Java 8: http://www.baeldung.com/java-8-streams

Example:

// sequential processes
myArray.stream().filter( ... ).map( ... ).collect(Collectors.toList()):

// parallel processes
myArray.parallelStream().filter( ... ).map( ... ).collect(Collectors.toList());
Guilherme Mussi
  • 956
  • 7
  • 14
  • [How do I write a good answer?](https://stackoverflow.com/help/how-to-answer): ***Provide context for links*** - *Links to external resources are encouraged, but please add context around the link so your fellow users will have some idea what it is and why it’s there.* ***Always quote the most relevant part of an important link, in case the target site is unreachable or goes permanently offline.*** – BackSlash May 25 '18 at 09:57
1

Looking at your code, you're going wrong with this line:

mymap1.put(arr[i], mymap1.get(arr[i]) + 1);

You are overwriting the values in parallel, for example:

Thread 1 'get' = 0
Thread 2 'get' = 0
Thread 1 'put 1' 
Thread 2 'put 1'

Change your map to:

static Map<Integer, AtomicInteger>       mymap1 = new ConcurrentHashMap<>();
static {
    //initialize to avoid null values and non-synchronized puts from different Threads
    for(int i=0;i<10;i++) {
        mymap1.put(i, new AtomicInteger());
    }
}
....
    //in your loop
    for (int i = 0; i < arr.length; i++) {
        AtomicInteger accumulator = mymap1.get(arr[i]);
        accumulator.incrementAndGet();
    }

Edit: The problem with the above approach is of course the initialization of mymap1. To avoid falling into the same trap (creating AtomicInteger within the loop and overwriting each other yet again), it needs to be prefilled with values.

Since I'm feeling generous, here's what might work with the Streams API:

int totalEvenCount = Arrays.stream(arr).parallel().filter(i->i%2==0).reduce(0, Integer::sum);
int totalOddCount = Arrays.stream(arr).parallel().filter(i->i%2!=0).reduce(0, Integer::sum);

//or this to count by individual numbers:
ConcurrentMap<Integer,List<Integer>> map1 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i->i));
map1.entrySet().stream().filter(e -> e.getKey()%2!=0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

As an exercise to the reader, perhaps you can look into how the various Collectors work, in order to write your own countingBy(i->i%2!=0) to output a map only containing the counts instead of a list of values.

Thomas Timbul
  • 1,634
  • 6
  • 14
  • Thanks for the bug.. Have edited the program and now both sequential and parallel outputs are same. However, parallel execution is taking 3 times more time. :( – Dhananjay May 25 '18 at 10:29
  • Have added some Streams API code for you to try and get inspiration from. – Thomas Timbul May 25 '18 at 10:40
  • Thanks for the answer. I understand your approach now. Its cleaner and functional. However, its slower than the imperative way. Do you know y? – Dhananjay May 28 '18 at 04:50
  • @Dhananjay As a guess, most likely due to the overhead associated with Lamndas. Lamndas are objects in their own right. The functional loop can easily be optimised by the compiler at compile time, but the Lamnda object creation/invocation cycle can't as readily. It depends on how you do the computation as well - note the use of `boxed()` in my code, which converts all the `int`s into `Integer` - that is a lot of overhead! You could create the array as `Integer[]` and see how that performs in your tests, as comparing `int` to `Integer` is simply not fair. – Thomas Timbul May 29 '18 at 10:37