6

So as it goes in the current scenario, we have a set of APIs as listed below:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();

Over these, one of our schedulers performs the tasks e.g.

private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}

While reviewing this, I thought of moving to a more flexible implementation 1 of performing tasks which would look like:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}

The point that strikes my mind now is that the Javadoc clearly states that

accumulator - an associative, non-interfering, stateless function for combining two values

Next up I was thinking How to ensure order of processing in java8 streams? to be ordered (processing order to be same as encounter order)!

Okay, the stream generated out of a List would be ordered and unless the stream is made parallel before reduce the following implementation shall work. 2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}

Q. Does this assumption 2 hold true? Would it be guaranteed to always execute the consumers in the order that the original code had them?

Q. Is there a possibility somehow to expose 1 as well to the callees to perform tasks?

Naman
  • 27,789
  • 26
  • 218
  • 353
  • 1
    The assumption “2” should not contain the phrase to “unless the stream is made parallel”, as a solution can only be correct or incorrect. Depending on sequential execution would be incorrect (I’m not saying that this is incorrect, the answer is more complicated). But, what do you mean with “to expose 1 as well”? – Holger Jan 23 '20 at 17:27
  • @Holger By that I meant, can I just create implementation **1** in my service and let other services making use of it pass on a `Stream> consumerStream` which brings in a complexity of `UNORDERED` stream as well. But then, can I still change the stream to follow the encounter order? ( i hope I am able to explain better) – Naman Jan 23 '20 at 18:25
  • 3
    I think what you are trying to ask here is : can my method accept a `Stream>` and even if someone passes a `parallel` stream or `unordered()` called explicitly on it, I will still be able to execute the consumers in the exact order in which they have been received. – Eugene Jan 23 '20 at 18:29
  • @Eugene In fact. Thank you for the improved wording. – Naman Jan 23 '20 at 18:30
  • 2
    When the stream is unordered, there is no encounter order and no way to construct one. The assumption that whatever order you exhibit in sequential evaluation has a special meaning, is fundamentally flawed. For an ordered stream, however, Andreas already pointed out that it would work. However, there are other, practical problems with this approach. I’ll assemble an answer. – Holger Jan 23 '20 at 18:33

2 Answers2

9

As Andreas pointed out, Consumer::andThen is an associative function and while the resulting consumer may have a different internal structure, it is still equivalent.

But let's debug it

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}

will print

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

whereas changing the reduction code to

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}

prints on my machine

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│   ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│   └─combined
│     ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│     └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
  ├─combined
  │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
  │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
  └─combined
    ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
    └─combined
      ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
      └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

illustrating the point of Andreas’ answer, but also highlighting an entirely different problem. You may max it out by using, e.g. IntStream.range(0, 100) in the example code.

The result of the parallel evaluation is actually better than the sequential evaluation, as the sequential evaluation creates an unbalanced tree. When accepting an arbitrary stream of consumers, this can be an actual performance issue or even lead to a StackOverflowError when trying to evaluate the resulting consumer.

For any nontrivial number of consumers, you actually want a balanced consumer tree, but using a parallel stream for that is not the right solution, as a) Consumer::andThen is a cheap operation with no real benefit from parallel evaluation and b) the balancing would depend on unrelated properties, like the nature of the stream source and the number of CPU cores, which determine when the reduction falls back to the sequential algorithm.

Of course, the simplest solution would be

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}

But when you want to construct a compound Consumer for re-using, you may use

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}

The code will provide a single Consumer just using a loop when the number of consumers exceeds a threshold. This is the simplest and most efficient solution for a larger number of consumers and in fact, you could drop all other approaches for the smaller numbers and still get a reasonable performance…

Note that this still doesn’t hinder parallel processing of the stream of consumers, if their construction really benefits from it.

Naman
  • 27,789
  • 26
  • 218
  • 353
Holger
  • 285,553
  • 42
  • 434
  • 765
  • 1
    I can't understand how you can post such a comprehensive answer so fast and how can people upvote it (I assume understand it?) so fast too. – Eugene Jan 23 '20 at 18:43
  • 3
    @Eugene, Naman well, I was already writing the answer when I made my last comment. Of course, it helps having encountered such a problem before. Say [`AWTEventMulticaster`](https://docs.oracle.com/javase/8/docs/api/java/awt/AWTEventMulticaster.html) and too many listeners allowed to experience such a problem even in Java 1.1. Then, I stumbled over it when Java 8 was new and I used reduction for building an evaluation tree, so I already had a variant of the balanced reduction code and only needed to adapt and test it for this answer. – Holger Jan 23 '20 at 18:56
  • 1
    @Naman my assumption is that "him" is not a human being at all, but some sort of hybrid AI. The answer is very interesting (jokes aside), starting from the recursive `toString` in the `DebuggableConsumer` from that balanced Split. I admit that the balancedSplit looks very familiar to the ArrayList Spliterator... but yes, this is extremely nice. – Eugene Jan 23 '20 at 18:59
  • @Holger Made a [minor edit](https://stackoverflow.com/revisions/59885137/2) over the unused `data` in `combineAllTasks`. – Naman Jan 23 '20 at 19:40
  • 1
    @Eugene I have always had the same thought ;). It will take a while for me to fully understand the last part of the answer. Excellent answer as always. – Thiyagu Jan 24 '20 at 04:34
4

Even if the Stream<Consumer<T>> is made parallel, the resulting compound Consumer will execute the individual consumers in order, assuming:

  • The Stream is ordered.
    A stream sourced by a List is ordered, even with parallel enabled.

  • The accumulator passed to reduce() is associative.
    Consumer::andThen is associative.

Let's say you have a list of 4 consumers [A, B, C, D]. Normally, without parallel, the following would happen:

x = A.andThen(B);
x = x.andThen(C);
compound = x.andThen(D);

so that calling compound.apply() would call A, B, C, then D in that order.

If you enable parallel, the stream framework might instead split that to be processed by 2 threads, [A, B] by thread 1, and [C, D] by thread 2.

That means the following will happen:

x = A.andThen(B);
y = C.andThen(D);
compound = x.andThen(y);

The result is that x is applied first, which means A then B, then y is applied, which means C then D.

So although the compound consumer is built like [[A, B], [C, D]] instead of the left-associative [[[A, B], C], D], the 4 consumers are executed in order, all because Consumer::andThen is associative.

Andreas
  • 154,647
  • 11
  • 152
  • 247
  • Thanks a lot for the answer. I probably knew the order and associative part, still, a validation with an example of processing in threads ease in understanding further. Just a point though and you must have read Holger's answer anyway, I cannot mark both the answers. Thanks a lot again. :) – Naman Jan 24 '20 at 07:41