4

I have the following scenario whereby my program is using blocking queue to process message asynchronously. There are multiple RSocket clients who wish to receive this message. My design is such a way that when a message arrives in the blocking queue, the stream that binds to the Flux will emit. I have tried to implement this requirement as below, but the client doesn't receive any response. However, I could see Stream supplier getting triggered correctly.

Can someone pls help.

  @MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
    System.out.println("Adding Listener:"+clientName);
    BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
    Datalistener.register(clientName,listenerQ);

    return Flux.fromStream(
            ()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
        System.out.println("I got an event : "+q.getResult());
        return q;
    });
}


private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
    try{
        return inStream.take();
    }catch(Exception e){
        return null;
    }
}
Saji
  • 111
  • 6
  • Can you change Datalistener? – Yuri Schimke Dec 16 '20 at 19:15
  • Sorry I didn't get you. You mean did I change the program as you have suggested below i.e. .subscribeOn(Schedulers.elastic()). Can you please explain it better ? – Saji Dec 18 '20 at 10:25
  • BlockingQueue is inherently incompatible with Rx semantics. It basically offers two main read APIs, polling or blocking take. Neither one works for what you want. So you can either a) create ObservableQueue like the example link below. b) be prepared to donate an IO thread to sit in a tight blocking loop, see the answer below, c) change Datalistener to stop using a BlockingQueue – Yuri Schimke Dec 18 '20 at 10:30

1 Answers1

1

This is tough to solve simply and cleanly because of the blocking API. I think this is why there aren't simple bridge APIs here to help you implement this. You should come up with a clean solution to turn the BlockingQueue into a Flux first. Then the spring-boot part becomes a non-event.

This is why the correct solution is probably involving a custom BlockingQueue implementation like ObservableQueue in https://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html

A alternative approach is in How can I create reactor Flux from a blocking queue?

If you need to retain the LinkedBlockingQueue, a starting solution might be something like the following.

  val f = flux<String> {
    val listenerQ = LinkedBlockingQueue<QueryResult>()
    Datalistener.register(clientName,listenerQ);
    
    while (true) {
      send(bq.take())
    }
  }.subscribeOn(Schedulers.elastic())

With an API like flux you should definitely avoid any side effects before the subscribe, so don't register your listener until inside the body of the method. But you will need to improve this example to handle cancellation, or however you cancel the listener and interrupt the thread doing the take.

Yuri Schimke
  • 12,435
  • 3
  • 35
  • 69