0

I've a use case where the stream should only emit when the cumulative "sum" equals or exceeds a given value, n. Let's take the example of six integers with n = 5.

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

As you can see, nothing is emitted unless the sum equals or exceeds 5, except for the last element, which is emitted anyway.

Once an item is emitted, the sum gets reduced by that value (n). In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

I'm using project Reactor Flux as the Publisher; I couldn't find any method on it that allows me do what is shown above. scan comes closest, but it also emits intermediate elements that need to be filtered out.

Progman
  • 16,827
  • 6
  • 33
  • 48
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • 1
    Please edit your question to include a longer example. It looks like it is not just when the sum is equal or above 5, but also the value 5 is subtracted again which reduces the "sum" again, at which point the next sum *might* not reach the limit 5 again. Also what is the logic behind the last entry which has do emitted anyway? Also add what you are trying to do overall, maybe you can solve your problem in a different way (XY problem?). – Progman Jul 26 '20 at 20:45
  • @Progman Edited question. – Abhijit Sarkar Jul 26 '20 at 22:56

3 Answers3

1

In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

It occurred to me that trying to split the response Flux myself is probably little late and quite difficult; instead, I could use something like Netty FixedLengthFrameDecoder, which does exactly what I'm looking for.

That led me to reactor-netty source code, and after extensive digging, I found exactly what I needed.

fun get(url: String, maxChunkSize: Int): List<ByteArray> {
    return HttpClient.create()
        .httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
        .get()
        .uri(url)
        .responseContent()
        .asByteArray()
        .collectList()
        .block()!!
}

The crucial part is httpResponseDecoder { it.maxChunkSize(maxChunkSize) }; a unit test proves this to be working:

@Test

fun testHonorsMaxChunkSize() {
    val maxChunkSize = 4096
    val chunks = FixedLengthResponseFrameClient.get(
        "http://doesnotexist.nowhere/binary", maxChunkSize
    )

    assertThat(chunks.subList(0, chunks.size - 1))
        .allMatch { it.size ==  maxChunkSize}
    assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}

WebClient can be configured with a custom HttpClient (configured with httpResponseDecoder) as shown below:

WebClient
  .builder()
  .clientConnector(ReactorClientHttpConnector(httpClient))
  .build()
  .get()
  .uri("uri")
  .exchange()
  .flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
  ...

The size of these buffers would be what's set in the HttpClient.httpResponseDecoder (8192 Kb by default).

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • Nice solution, i lately noticed edit about what's your use-case for such feature. And therefore was restricted my answer to usual flux operation's. – Norbert Dopjera Jul 27 '20 at 16:09
0

This is not possible to do directly on Flux object, but you might achieve solution if you have access to resource from which the Flux object is created. Since inside stream (Flux) you are not able access previous element's you can create Flux over indices to you resource and access this resource (since its read only operation) from that Flux of indices directly. For example something like this:

List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
        .flatMap(i -> {
            int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
            if (sum >= 5) {
                atomicSum.updateAndGet(integer -> integer - 5);
                return Flux.just(5);
            }

            return (i.equals(list.size() -1))
                    ? Flux.just(list.get(i)) // emit last element even if sum was not 5
                    : Flux.empty();
        }); // emitted element's

Note that this is not good practice to do and i don't advice such solution. Flux objects processing might skip between thread's, so if you modify object outside of Flux you should do it in synchronized way (therefore usage of AtomicReference). List is used only for read-only operation's and therefore it's OK. Also i don't know if that part of the code will actually work but i wanted to show you how you might find solution if you have access to resource over which your Flux object is created.

Edit: even such solution would not work. I have mistaken myslef, Flux object don't skip between threads but might be processed by multiple threads leading single atomic reference to invalid state. This cloud still be solved with some synchronizing mechanism like lock's instead of Atomic reference but is far beyond average developer experience. Are you sure you cannot use scan() function since you can provided your own accumulator function as argument?

Norbert Dopjera
  • 741
  • 5
  • 18
  • I can use `scan`, but as I said, the intermediate elements need to be filtered out. I was hoping for something more elegant. – Abhijit Sarkar Jul 26 '20 at 22:57
  • Cant see any other more elegant solution as `Flux` is much restricted in this case. Filter after Scan is still much more elegant then usual implementation so i guess the answer is to stick with Scan and filter unwanted intermediate result's. – Norbert Dopjera Jul 27 '20 at 00:18
  • Actually, `scan` won’t work because it has no way to know when the emit the last element – Abhijit Sarkar Jul 27 '20 at 01:02
  • I still don't understand that requirement for the last element. Also why are u emitting cumSum of required number. Do you need to count how many times cumSum has been achieved? Do you need that last element as some form of information of how much cumSum was accumulted at last element ? These information's would be appropriate since algorithm consisting of `Flux` operator's (high order function's) might be adjusted to it. – Norbert Dopjera Jul 27 '20 at 01:24
  • I'm not sure what about the last element is not clear to you. We wouldn't want to drop it, right? Since there are no more elements coming (Flux doesn't know that), if we waited for sum = n, the last element would be lost (assuming it's < n, as in my example). – Abhijit Sarkar Jul 27 '20 at 03:21
  • Got it, see my answer. – Abhijit Sarkar Jul 27 '20 at 06:05
0

If you need to keep a running total or otherwise maintain state from which your flux is derived, a way to go about it is to create a new flux that subscribes to the first flux, and maintains the state with the subscription, e.g

Flux<Long> flux = Flux.just(1L, 2L, 3L, 4L, 5L);

Sinks.Many<Long> runningTotalSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Long> runningTotalFlux = runningTotalSink.asFlux()
        .doOnSubscribe(subscription -> {
            AtomicLong runningTotal = new AtomicLong();
            flux
                    .doOnCancel(subscription::cancel)
                    .doOnError(runningTotalSink::tryEmitError)
                    .doOnComplete(runningTotalSink::tryEmitComplete)
                    .subscribe(i -> {
                        runningTotalSink.tryEmitNext(runningTotal.accumulateAndGet(i, Long::sum));
                    });
        });

runningTotalFlux.toStream().forEach(i -> {
    System.out.println(i);
});
Chomeh
  • 1,046
  • 13
  • 15