13

I have the following example data set that I want to transform / reduce using Java stream api based on direction's value

Direction    int[]
IN           1, 2
OUT          3, 4
OUT          5, 6, 7
IN           8
IN           9
IN           10, 11
OUT          12, 13
IN           14

to

Direction    int[]
IN           1, 2, 
OUT          3, 4, 5, 6, 7
IN           8, 9, 10, 11
OUT          12, 13
IN           14

code that I've written so far

enum Direction { IN, OUT }

class Tuple {
  Direction direction;
  int[] data;

  public Tuple merge(Tuple t) {
      return new Tuple(direction, concat(getData(), t.getData()));
  }
}

private static int[] concat(int[] first, int[] second) {
    int[] result = Arrays.copyOf(first, first.length + second.length);
    System.arraycopy(second, 0, result, first.length, second.length);
    return result;
}

List<Tuple> reduce = tupleStream.reduce(new ArrayList<>(), WDParser::add, WDParser::combine);

private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) {
    System.out.println("combine");
    list1.addAll(list2);
    return list1;
}

private static List<Tuple> add(List<Tuple> list, Tuple t) {
    System.out.println("add");
    if (list.size() == 0) {
        list.add(t);
    } else if (list.size() > 0) {
        int lastIndex = list.size() - 1;
        Tuple last = list.get(lastIndex);
        if (last.getDirection() == t.getDirection())
            list.set(lastIndex, last.merge(t));
        else
            list.add(t);
    }

    return list;
}

I believe there is a better and simpler alternative to achieving the same.

Online examples and blogs I've found for Java stream api reduce/combine use Integer::sum function only. Hoping to build this up for more complex case scenarios.

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Amitoj
  • 423
  • 4
  • 18

4 Answers4

5

I think your solution is pretty nice already, especially as using a reduction enables parallelism easily compared to collecting into a shared outside container. But it's easier to use collect instead of reduce as Holger pointed out. Furthermore, the conditions in the accumulator can be simplified a bit, and you forgot to merge the last and first elements in the combiner:

List<Tuple> reduce = tupleStream.collect(ArrayList::new, WDParser::add, WDParser::combine);

private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2)
{
    if (!list2.isEmpty())
    {
        add(list1, list2.remove(0)); // merge lists in the middle if necessary
        list1.addAll(list2);         // add all the rest
    }
    return list1;
}

private static List<Tuple> add(List<Tuple> list, Tuple t)
{
    int lastIndex = list.size() - 1;
    if (list.isEmpty() || list.get(lastIndex).getDirection() != t.getDirection())
    {
        list.add(t);
    }
    else
    {
        list.set(lastIndex, list.get(lastIndex).merge(t));
    }
    return list;
}

Instead of using indexes to access the first/last element you could even use LinkedList and the methods add/removeFirst/Last().

Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
  • 1
    This way of using `reduce` will break awfully when trying with parallel execution. The functions passed to `reduce` should never modify the incoming objects. This kind of operation should be implemented using `collect`. See [Mutable Reduction](https://docs.oracle.com/javase/10/docs/api/java/util/stream/package-summary.html#MutableReduction)… – Holger Sep 04 '18 at 18:29
  • @Holger I thought about it earlier, but only found the issues now that you pointed it out again. Have changed it to a collect. Is that enough or do I have to create new lists in the accumulator and combiner, as well? – Malte Hartwig Sep 04 '18 at 18:41
  • 1
    You don’t need to create new lists in the accumulator or combiner; that’s the whole point of collectors. The only thing that is missing, is that the combiner should be able to handle an empty `list2` argument as well (this may happen when you `collect` after a `filter`). – Holger Sep 05 '18 at 05:54
  • Thanks, I'll check the solution later today and update. – Amitoj Sep 05 '18 at 10:41
  • What's the purpose of the `combine` method? I haven't seen it being called in either of the implementations. – Amitoj Sep 05 '18 at 12:22
  • 1
    @Amitoj it will only be called in parallel streams. Simply speaking, the stream will be split into chunks in that case. Each part will be collected using the "accumulator" (`add`). Afterwards, these collection results will be "combined" into the final result. For sequential stream, there is only 1 part (the whole stream) accumulated, hence there is no need to to use the combiner. – Malte Hartwig Sep 05 '18 at 12:48
3

How about this. First define a small helper method:

private static Tuple mergeTwo(Tuple left, Tuple right) {
    int[] leftArray = left.getData();
    int[] rightArray = right.getData();
    int[] result = new int[leftArray.length + rightArray.length];
    System.arraycopy(leftArray, 0, result, 0, leftArray.length);
    System.arraycopy(rightArray, 0, result, leftArray.length, rightArray.length);
    return new Tuple(left.getDirection(), result);
}

This is close to your concat/merge I guess, but a single one. Basically a way to merge two Tuple(s) together.

And a helper method to produce the needed Collector, you can put this into a utility so that it can be re-used:

private static Collector<Tuple, ?, List<Tuple>> mergedTuplesCollector() {
    class Acc {

        ArrayDeque<Tuple> deque = new ArrayDeque<>();

        void add(Tuple elem) {
            Tuple head = deque.peek();
            if (head == null || head.getDirection() != elem.getDirection()) {
                deque.offerFirst(elem);
            } else {
                deque.offerFirst(mergeTwo(deque.poll(), elem));
            }
        }

        Acc merge(Acc right) {

            Tuple lastLeft = deque.peekLast();
            Tuple firstRight = right.deque.peekFirst();

            if (lastLeft.getDirection() == firstRight.getDirection()) {
                deque.offerLast(mergeTwo(deque.pollLast(), right.deque.pollFirst()));
            } else {
                deque.addAll(right.deque);
            }

            return this;
        }

        public List<Tuple> finisher() {
            return new ArrayList<>(deque);
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

And usage would be, for example:

List<Tuple> merged = tuples.stream()
            .parallel()
            .collect(mergedTuplesCollector());
Eugene
  • 117,005
  • 15
  • 201
  • 306
1

This is an alternative approach that uses slightly different data structures.

If this is an option, changing from int[] to List<Integer> allows for more flexibility (not to mention avoiding creating/copying arrays multiple times):

class Tuple {
    Direction direction;
    List<Integer> data;
}

And the following function does the merging on a Deque collection:

private static List<Integer> next(Deque<Tuple> t, Direction d) {
    if (!t.isEmpty() && t.peekLast().getDirection() == d) {
        return t.peekLast().getData();
    } else {
        Tuple next = new Tuple();
        next.direction = d;
        next.data = new ArrayList<>();
        t.addLast(next);
        return next.data;
    }
}

And with that, the stream can look as simple as:

Deque<Tuple> deq = new LinkedList<>(); //the final collection of tuples

tuples.stream()
.flatMap(tp -> tp.getData().stream()
                 .map(d -> Pair.of(tp.getDirection(), Integer.valueOf(d))))
.forEach(el -> next(deq, el.getLeft()).add(el.getRight()));
ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • I think it's safer to use `forEachRemaining` instead of `forEach` here. The docs say that `forEach` is not deterministic about the order in which the elements is processed, and I as far as I see this could lead to inconsistencies with your approach. – Malte Hartwig Sep 04 '18 at 16:58
  • @MalteHartwig I think you meant `forEachOrdered` :) – fps Sep 04 '18 at 18:08
  • 1
    @FedericoPeraltaSchaffner hahaha, yes, confused it with Spliterator – Malte Hartwig Sep 04 '18 at 18:09
  • Thanks, I'm using int[] because it's easier to convert to & from Netty ByteBuf object. – Amitoj Sep 05 '18 at 11:57
  • Interesting use of stream flatMap, I'll keep this for future reference. – Amitoj Sep 05 '18 at 12:24
0

I've got two ideas on this topic. First one is getting the indices like in this answer and group it accordingly.

The second idea - if you already got a Stream a custom Collector should be used (similar to the other solutions, though using Deque):

private Collector<Tuple, ?, List<Tuple>> squashTuples() {
  return new Collector<Tuple, Deque<Tuple>, List<Tuple>>() {
    @Override
    public Supplier<Deque<Tuple>> supplier() {
      return ArrayDeque::new;
    }

    @Override
    public BiConsumer<Deque<Tuple>, Tuple> accumulator() {
      return (acc, e) -> {
        Objects.requireNonNull(e);
        if (!acc.isEmpty() && acc.peekLast().getDirection() == e.getDirection()) {
          acc.offerLast(acc.pollLast().merge(e));
        } else {
          acc.offerLast(e);
        }
      };
    }

    @Override
    public BinaryOperator<Deque<Tuple>> combiner() {
      return (left, right) -> {
        if (!left.isEmpty() && !right.isEmpty() && left.peekLast().getDirection() == right.peekFirst().getDirection()) {
          left.offerLast(left.pollLast().merge(right.pollFirst()));
        }
        left.addAll(right);
        return left;
      };
    }

    @Override
    public Function<Deque<Tuple>, List<Tuple>> finisher() {
      return ArrayList::new;
    }

    @Override
    public Set<Characteristics> characteristics() {
      return EnumSet.noneOf(Characteristics.class);
    }
  };
}
Flown
  • 11,480
  • 3
  • 45
  • 62
  • I don't think that a `combiner` needs checks for `isEmpty` in this case – Eugene Sep 05 '18 at 08:30
  • @Eugene I think you're basically right. There should not be a merging step with an empty container, but are there any guarantees? I cannot see any in the documentation. – Flown Sep 05 '18 at 09:02
  • well considering that a `accumulator` will always be called before a `combiner`, it seems that this is always so; but I am just wondering if this is documented a such... I'll try to find this – Eugene Sep 05 '18 at 09:04