3

I am trying to implement a reactor Flux created from a BlockingQueue but not sure which operator is best for my use case?

I am creating a streaming REST end point, where response is Flux that needs to keep emitting messages from a BlockingQueue as a response to GET REST call.

I have already tried forums and documentation and can only find Flux initiated from iterable collections or reactive data sources, but no examples from any BlockingQueue.

mindhunt3r
  • 181
  • 2
  • 10
  • Add some more description or your code snippet so that others can understand your issue and help you. – ASK Apr 08 '19 at 06:01

2 Answers2

11

You can try Flux#generate and Queue#peek. Just keep in mind that peek will return null if the queue is empty, and it cannot be used in onNext.

Something like:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

There is also Flux#repeatWhen operator, in case you want to re-subscribe to the queue after it was considered empty, e.g. with:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
bsideup
  • 2,845
  • 17
  • 21
  • 2
    It worked. Although, my client(browser) kept sending request again and again once server signalled sink complete. So i changed to BlockingQueue#take. Not sure how that fits into the reactive asynchronous processing. – mindhunt3r Apr 09 '19 at 14:50
  • 2
    Be careful because `#take` is blocking. Make sure you subscribe on blocking-friendly scheduler (e.g. `Schedulers.elastic()`) – bsideup Apr 10 '19 at 07:34
  • 4
    @bsideup, why is the `peek()` used here instead of the `poll()` to remove element from the queue once it has been pipelined? – Kyr Sep 25 '19 at 15:56
2

An alternative worth considering is to get rid of BlockingQueue and use Sinks instead.

This requires:

  1. Creating a sink, e.g.

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
    
  2. Exposing a sink as a Flux:

    sink.asFlux()
    
  3. Pushing to a sink:

    sink.tryEmitNext("SOME MESSAGE"); 
    
xmcax
  • 313
  • 1
  • 8