8

I have an infinite Flux (from kafka using reactor-kafka) of events that I'm trying to write in batches to a database, before continuing to the actual event processing. My issue is to get this to work with proper backpressure.

windowTimeout and bufferTimeout seemed like good candidates, as they allowed me to specify both max size, but also limit the time to wait in case of low "traffic".

First off was windowTimeout, from which the bulk-writes were made to the db. That did however quickly appear as problematic: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)).

I then switch to bufferTimeout, but was unsuccessful with the error reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests.

I hope the following illustrates the flow I'm after:

flux.groupBy(envelope -> envelope.partition)
  .flatMap(partitionFlux -> {
    final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
    final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
      .concatMap(batch -> {
        final ConsumedEnvelope last = batch.get(batch.size() - 1);

        return repository.persist(batch) // a)
          .then(last.acknowledge()) // b)
          .thenReturn(batch);
      });

    return processing(batchFlux);
  })
  .subscribe(result -> {
      // ...
  });

a) repository.persist does internally nothing but iterate the batch to create the insert operation, and then returns a Mono<Void>.

b) ConsumedEnvelope.acknowledge() is for Kafka offsetting, which I only want to do after having successfully persisted the batch. It's all wrapped in concatMap to only process a single batch at a time for each partition.

As mentioned above, this results in an Overflow-exception. Are there any idiomatic ways of achieving what I've tried to describe? Seems to me that it shouldn't be an all too uncommon task, but I'm new to reactor and would love to get some advice.

/d

EDIT I realised that simply adding onBackpressureBuffer actually solves this OK for me. But in general, are there better ways of doing this?

EDIT 2 ...the above did of course cause issues due to the unbound demand, which I somehow missed. So, back to the original problem or perhaps some way of having onBackpressureBuffer NOT request an unbound demand but only forward what's requested from downstream.

dforsl
  • 203
  • 1
  • 3
  • 11
  • Looks like this issue is open for 2+ years now! https://github.com/reactor/reactor-core/issues/1099 This URL also mentions a possible workaround. – Vivek Sethi May 29 '20 at 12:26

0 Answers0