9

I have the following code:

interface Device {
    // ...
    boolean isDisconnected();
    void reconnect();
}

interface Gateway {
    // ...
    List<Device> getDevices();
}

...

for (Gateway gateway : gateways) {
    for(Device device : gateway.getDevices()){
        if(device.isDisconnected()){
            device.reconnect();
        }
    }
}

I want to refactor the code using Stream API. My first attempt was like the following:

gateways
    .stream()
    .forEach(
        gateway -> {
            gateway
                .getDevices()
                .parallelStream()
                .filter(device -> device.isDisconnected())
                .forEach(device -> device.reconnect())
            ;
        }
    )
;

I didn't like it so after some modifications I ended up with this code:

gateways
    .parallelStream()
    .map(gateway -> gateway.getDevices().parallelStream())
    .map(stream -> stream.filter(device -> device.isDisconnected()))
    .forEach(stream -> stream.forEach(device -> device.reconnect()))
;

My question is whether there is a way to avoid nested forEach.

candied_orange
  • 7,036
  • 2
  • 28
  • 62

3 Answers3

16

You should flatten the stream of streams using flatMap instead of map:

gateways
    .parallelStream()
    .flatMap(gateway -> gateway.getDevices().parallelStream())
    .filter(device -> device.isDisconnected())
    .forEach(device -> device.reconnect());

I would improve it further by using method references instead of lambda expressions:

gateways
    .parallelStream()
    .map(Gateway::getDevices)
    .flatMap(List::stream)
    .filter(Device::isDisconnected)
    .forEach(Device::reconnect);
ETO
  • 6,970
  • 1
  • 20
  • 37
  • Thank you, it works! Does the code with `method references` have the same performance as the first one? It looks pretty elegant. – Hans van Kessels Dec 17 '18 at 22:15
  • 3
    Yes, it does. Please read [this post](https://stackoverflow.com/questions/24487805/lambda-expression-vs-method-reference) for more detailed explanation. – ETO Dec 17 '18 at 22:17
10

Don't refactor your code into using Streams. You gain no benefits and gain no advantages over doing it like this, since the code is now less readable and less idiomatic for future maintainers.

By not using streams, you avoid nested forEach statements.

Remember: streams are meant to be side-effect free for safer parallelization. forEach by definition introduces side-effects. You lose the benefit of streams and lose readability at the same time, making this less desirable to do at all.

Makoto
  • 104,088
  • 27
  • 192
  • 230
  • Thanks for your answer. This is just my in-home project. I'm trying to learn Stream API by doing some pseudo-real tasks. – Hans van Kessels Dec 17 '18 at 22:11
  • 5
    One way to learn about the Stream API is to know when *not* to apply it. This feels like one of those times, especially since you're not guaranteed that the operations you're performing inside of the stream are side-effect free which would make parallelization *safe*. – Makoto Dec 17 '18 at 22:12
  • 8
    I think with ETO's answer above with `flatMap`, that the stream version is prettier and cleaner than the non-stream version. – Mateen Ulhaq Dec 18 '18 at 07:40
  • @MateenUlhaq: This isn't about writing "pretty" code. This is about idiomatic code. Streams have the ability to be parallelized. If you use `forEach` or any terminating operand with side effects, the ability to parallelize your code weakens *substantially*. If you wanted to do this in parallel, you'd want a different structure to it to ensure that the operation around `Device#reconnect` was actually thread-safe. Doing it like this doesn't give you that assurance. – Makoto Dec 18 '18 at 17:18
4

I would try this with a sequential stream before using a parallel one:

gateways
    .stream()
    .flatMap(gateway -> gateway.getDevices().stream())
    .filter(device -> device.isDisconnected())
    .forEach(device ->  device.reconnect())
;

The idea is to create a stream via gateways.stream() then flatten the sequences returned from gateway.getDevices() via flatMap.

Then we apply a filter operation which acts like the if statement in your code and finally, a forEach terminal operation enabling us to invoke reconnect on each and every device passing the filter operation.

see Should I always use a parallel stream when possible?

candied_orange
  • 7,036
  • 2
  • 28
  • 62
Ousmane D.
  • 54,915
  • 8
  • 91
  • 126