11

I've a requirement where I would like to use the Java Stream Api to process a stream of events from a system and apply a data cleanup process to remove repeated events. This is removing the same event repeated multiple times in sequence, not creating a list of distinct events. Most of the Java Stream api examples available online target creating a distinct output from a given input.

Example, for input stream

[a, b, c, a, a, a, a, d, d, d, c, c, e, e, e, e, e, e, f, f, f]

the output List or Stream should be

[a, b, c, a, d, c, e, f]

My current implementation (not using Stream api) looks like

public class Test {
    public static void main(String[] args) {
        String fileName = "src/main/resources/test.log";
        try {
            List<String> list = Files.readAllLines(Paths.get(fileName));
            LinkedList<String> acc = new LinkedList<>();

            for (String line: list) {
                if (acc.isEmpty())
                    acc.add(line);
                else if (! line.equals(acc.getLast()) )
                    acc.add(line);
            }

            System.out.println(list);
            System.out.println(acc);

        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }
}

Output,

[a, b, c, a, a, a, a, d, d, d, c, c, e, e, e, e, e, e, f, f, f]
[a, b, c, a, d, c, e, f]

I've tried various example with reduce, groupingBy, etc., without success. I can't seem to find a way to compare a stream with the last element in my accumulator, if there is such a possibilty.

Amitoj
  • 423
  • 4
  • 18
  • 5
    As a side note, consider reading [“When to use LinkedList over ArrayList?”](http://stackoverflow.com/q/322715/2711488). Simply said, you almost never want to use `LinkedList`… – Holger Jan 16 '17 at 14:26
  • Are the repeated items necessarily consecutive? Could you have another "a" after, e.g., a "d"? And if, should it be removed or not? – Mureinik Jan 16 '17 at 15:03
  • 2
    @Mureinik The statement *"This is removing the same event repeated multiple times in sequence*" already covers this scenario IMO. – Chetan Kinger Jan 16 '17 at 15:08
  • 1
    @CKing I completely skipped over that sentence, for some reason. Mea culpa. – Mureinik Jan 16 '17 at 15:13

6 Answers6

9

You can use IntStream to get hold of the index positions in the List and use this to your advantage as follows :

List<String> acc = IntStream
            .range(0, list.size())
            .filter(i -> ((i < list.size() - 1 && !list.get(i).equals(list
                    .get(i + 1))) || i == list.size() - 1))
            .mapToObj(i -> list.get(i)).collect(Collectors.toList());
System.out.println(acc);

Explanation

  1. IntStream.range(0,list.size()) : Returns a sequence of primitive int-valued elements which will be used as the index positions to access the list.
  2. filter(i -> ((i < list.size() - 1 && !list.get(i).equals(list.get(i + 1) || i == list.size() - 1)) : Proceed only if the element at current index position is not equal to the element at the next index position or if the last index position is reached
  3. mapToObj(i -> list.get(i) : Convert the stream to a Stream<String>.
  4. collect(Collectors.toList()) : Collect the results in a List.
Chetan Kinger
  • 15,069
  • 6
  • 45
  • 82
  • Hi @CKing, thanks for you quick response. I just tried your solution which seems to be logically correct, but I'm not getting the desired output. Please check https://gist.github.com/amitoj/6b1705cd127e282cf87921ebe9e5d82e Output is same as the input. – Amitoj Jan 17 '17 at 07:43
  • @Amitoj I tested it out in Ideone and works as expected. Please see the [stdout](http://ideone.com/8ghrld) for my run. Have you copied my solution as is and are you sure there is no other bug in your code? – Chetan Kinger Jan 17 '17 at 08:18
  • 1
    Well, the obvious problem is that this code only works with test data, i.e. String literals, but not when reading strings from a file. The reason is described in “[How do I compare strings in Java?](http://stackoverflow.com/q/513832/2711488)” – Holger Jan 17 '17 at 13:32
  • 1
    @Holger My bad.. Thanks for the comment. I am aware of how to compare `String`s in Java and I can see from the code in the question that the OP is too. This happens sometimes. (In this particular case, my brain seems to have read `IntStream` and forgotten that the values in the `List` are `String`) – Chetan Kinger Jan 17 '17 at 13:49
  • Thanks @CKing, i'm still trying to wrap my head around your answer but it works and will accept it as the answer. However, I should've mentioned earlier that I want to apply the solution to a continuous stream of data - source could be TCP/UDP sockets or web socket, in which case list.size() is not available to us because we would applying the operation directly on a stream. Do you any ideas how to achieve that? – Amitoj Jan 18 '17 at 12:00
  • @Amitoj Let me think about it and get back to you. Meanwhile, you could post a new question (don't edit this one since it has reached a logical point) so that others can help you out as well. Also, let me know your doubts about this answer.. – Chetan Kinger Jan 18 '17 at 13:38
  • @Amitoj stream of data related with java 8's streams api ? and how ? – Ömer Erden Jan 19 '17 at 05:44
  • But you are not "streaming"! Because you need to know the size of the item set and you need to access the list. – dash1e May 31 '21 at 16:30
3

You might use a custom Collector to achieve your goal. Please find details below:

Stream<String> lines =  Files.lines(Paths.get("distinct.txt"));
LinkedList<String> values = lines.collect(Collector.of(
            LinkedList::new,
            (list, string) -> {
                if (list.isEmpty())
                    list.add(string);
                else if (!string.equals(list.getLast()))
                    list.add(string);
            },
            (left, right) -> {
                left.addAll(right);
                return left;
            }
    ));

values.forEach(System.out::println);

However it might have some issues when parallel stream is used.

Anton Balaniuc
  • 10,889
  • 1
  • 35
  • 53
  • 3
    The issue with parallel execution is that the combiner doesn’t check whether the last element of `left` matches the first element of `right`. In that case, the first element must not be added. A correct combiner would be `if(left.isEmpty()) return right; else if(!right.isEmpty()) left.addAll(left.getLast().equals(right.getFirst())? right.subList(1, right.size()): right); return left;` – Holger Jan 17 '17 at 13:28
2

Another concise syntax would be

AtomicReference<Character> previous = new AtomicReference<>(null);
Stream.of('a', 'b', 'b', 'a').filter(cur -> !cur.equals(previous.getAndSet(cur)));
Abhinav Atul
  • 601
  • 6
  • 14
0

EDIT: as commented by @Bolzano, this approach does not meet the requirement.

If t is the input stream then

Map<String,Boolean> s = new HashMap<>();
Stream<String> u = t.filter(e -> s.put(e, Boolean.TRUE)==null);

will produce an Stream of unique elements without creating a List.

Then a plain

List<String> m = u.collect(Collectors.toList());

can create a List on unique elements.

I do not understand why such lengthy solutions as @CKing and @Anton propose would be required? Am I missing something?

Serg M Ten
  • 5,568
  • 4
  • 25
  • 48
  • Yes you are missing something, compare the input array and output array again. He doesn't want unique elements, he wants to convert repeated sequence of elements into single one. If you want to collect unique elements your solution is not short also, you can just use distinct() method of stream then collect. -> list.stream().distinct().collect(... – Ömer Erden Jan 18 '17 at 14:11
  • Yes! @Bolzano you are right but then a very similar approach `Map s = new HashMap<>(); Stream u = t.filter(e -> !e.equals(s.put(Boolean.TRUE, e)));` should do the job of filtering the ones that are equal to the previous. Doesn't it? – Serg M Ten Jan 18 '17 at 14:25
  • Consider stream is on first element and it's value is "a", so "a" is marked as true in your hashmap, then stream finds the second "a" after 3 different elements, in this situation the second "a" will be filtered because it's already located in your hashmap, this behaviour is same as what distinct() does. so yes it will filter , but the main question is different. – Ömer Erden Jan 18 '17 at 14:40
  • In the code fragment at the comment, the HashMap only remembers the last element seen. Therefore ["a","b","c","d","a"] will not remove the second "a" because `s.put(Boolean.TRUE, e)` will return "d" so `!e.equals("d")` will be `true`. Right? – Serg M Ten Jan 18 '17 at 14:57
  • 1
    Sorry i missed s.put(Boolean.TRUE, e) in your comment. It's looking like, `t.filter(e -> !e.equals(s.put(Boolean.TRUE, e)));` will do the job also.But this approach is not thread safe, you have state. Arguments should be effectively final, this approach is kinda trick to bypass that rule. But it will work as expected in a single thread. – Ömer Erden Jan 18 '17 at 15:23
  • Certainly this approach relies on the elements being processed by a single thread in sequential order. But is it possible to perform the consecutive deduplication filtering in parallel? – Serg M Ten Jan 18 '17 at 16:09
  • why not ? Its possible with fork/join – Ömer Erden Jan 18 '17 at 16:15
  • To fork/join the stream should be collected and split before processing and each thread should be aware of the last element of the sublist assigned to the previous thread. But I can't see how an equivalent functionality would be feasible with something like `stream().parallel().forEach(e->deduplicate(e))` – Serg M Ten Jan 18 '17 at 16:59
  • @SergioMontoro Can you elaborate on what you mean by lengthy? – Chetan Kinger Jan 20 '17 at 17:09
0

With Java 7, you can do this using iterator.

Iterator<Integer> iterator = list.values().iterator();
Integer previousValue = null;

while(iterator.hasNext()) {
    Integer currentValue = iterator.next();
    if(currentValue.equals(previousValue)){
        iterator.remove();
    }
    previousValue = currentValue;
}
Radu Linu
  • 1,143
  • 13
  • 29
  • Note that this doesn't actually answer the question for various reasons: 1) the OP is not asking about how to do it in place, i.e. creating a new list is ok 2) the OP already has a working pre-Java 8 version and is explicitly asking about how you'd do it with _streams_. – Thomas Mar 12 '20 at 14:44
-1

Please try this solution :

public class TestDuplicatePreviousEvent {

public static void main(String[] args) {
    List<Integer> inputData = new ArrayList<>();
    List<Integer> outputData = new ArrayList<>();

    inputData.add(1);
    inputData.add(2);
    inputData.add(2);
    inputData.add(3);
    inputData.add(3);
    inputData.add(3);
    inputData.add(4);
    inputData.add(4);
    inputData.add(4);
    inputData.add(4);
    inputData.add(1);

    AtomicInteger index = new AtomicInteger();
    Map<Integer, Integer> valueByIndex = inputData.stream().collect(Collectors.toMap(i -> index.incrementAndGet(), i -> i));

    outputData = valueByIndex.entrySet().stream().filter(i -> !i.getValue().equals(valueByIndex.get(i.getKey() - 1))).map(x -> x.getValue()).collect(Collectors.toList());
    System.out.println(outputData);
}

}

Output : [1, 2, 3, 4, 1]

Solution without map :

public class TestDuplicatePreviousEvent {

public static void main(String[] args) {
    List<Integer> inputData = new ArrayList<>();
    List<Integer> outputData = new ArrayList<>();

    inputData.add(1);
    inputData.add(2);
    inputData.add(2);
    inputData.add(3);
    inputData.add(3);
    inputData.add(3);
    inputData.add(4);
    inputData.add(4);
    inputData.add(4);
    inputData.add(4);
    inputData.add(1);
    inputData.add(1);
    inputData.add(1);
    inputData.add(4);
    inputData.add(4);

    AtomicInteger index = new AtomicInteger();
    outputData = inputData.stream().filter(i -> filterInputEvents(i, index, inputData)).collect(Collectors.toList());
    System.out.println(outputData);
}

private static boolean filterInputEvents(Integer i, AtomicInteger index, List<Integer> inputData) {

    if (index.get() == 0) {
        index.incrementAndGet();
        return true;
    }
    return !(i.equals(inputData.get(index.getAndIncrement() - 1)));
}

}

Rohit Gulati
  • 542
  • 3
  • 15
  • 1
    This solution would require one additional step for converting the input `List` into a `Map` as the input data is coming from a file. – Chetan Kinger Jan 16 '17 at 14:33