17

Suppose I have a repository with a findAll() method that returns an Iterable of State, where State is a class representing a US state that has two fields (with getter/setters): name, and population.

I want to get the sum of the population fields for all States in my Flux. I create a Flux from an Iterable, as follows:

Flux f = Flux.fromIterable(stateRepo.findAll());

I've got my Flux, but I don't know of a good way to sum up its values. I've tried something like

int total = 0;
f.map(s -> s.getPopulation()).subscribe(p -> total += v);
return total;

However, the compiler says that total "should be final or effectively final". Adding final obviously won't work, because I'm trying to add to it.

How do I go about summing (or any other aggregate function) on a Flux?

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
Mark
  • 4,970
  • 5
  • 42
  • 66

2 Answers2

24

Use reduce method:

@GetMapping("/populations")
    public Mono<Integer> getPopulation() {
        return Flux.fromIterable(stateRepo.findAll())
                .map(s -> s.getPopulation())
                .reduce(0, (x1, x2) -> x1 + x2)
                .map(this::someFunction); //you can also handle the sum, if you wish
    }
mroman
  • 1,548
  • 1
  • 16
  • 22
  • Thanks. I went more or less with this answer. I discovered that if the `Iterable` I create the Flux from happens to be empty then I get a NullPointerException. I worked around this by using `.reduce(0, (x1, x2) -> x1 + x2)`, where the 0 arg is the initial value for the aggregate. Also, using `block()` instead of `subscribe()` causes the aggregate value to be returned. – Mark Feb 27 '18 at 21:56
  • 1
    Great. I wold suggest to avoid using block method in production as you block yourself an wait for result. If you use Spring Boot and need to process that sum of populations than do it in extra map method and return the Mono(it is a Mono after reducing) as a controller result. Example: @GetMapping("/populations") public Mono getPopulation() { Flux.fromIterable(stateRepo.findAll()) .map(s -> s.getPopulation()) .reduce(0, (x1, x2) -> x1 + x2) .map(this::someFunction); // here you can handle the sum } – mroman Feb 27 '18 at 22:21
  • note this is the more generified method that would work with any type. can you give details about the NPE? that sounds strange to me. – Simon Baslé Mar 01 '18 at 08:47
  • 3
    `.reduce(0, (x1, x2) -> x1 + x2)` can be replaced with `.reduce(0, Integer:sum)` – Iurii Ant Apr 27 '22 at 09:11
5

You can import reactor extra package from maven

io.projectreactor.addons:reactor-extra

and then use MathFlux.sumInt(integresFlux)
docs: https://projectreactor.io/docs/core/release/reference/#extra-math

Slava
  • 311
  • 3
  • 9