In my project, there is a Spring scheduler periodically scans "TO BE DONE" tasks from DB, then distributing them to task consumer for subsequent handling. So, the current implementation is to construct a Reactor Sinks between producer and consumer.
Sinks.Many<Task> taskSink = Sinks.many().multicast().onBackpressureBuffer(1000, false);
Producer:
Flux<Date> dates = loadDates();
dates.filterWhen(...)
.concatMap(date -> taskManager.getTaskByDate(date))
.doOnNext(taskSink::tryEmitNext)
.subscribe();
Consumer:
taskProcessor.process(taskSink.asFlux())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
By using Sink, it works fine for most of cases. But when the system under heavy load, system maintainer would want to know:
- How many tasks still sitting in the Sink?
- If it is possible to clear all tasks within the Sink.
- If it is possible to prioritize tasks within the Sink.
Unfortunately, Sink it's impossible to fulfill all the needs mentioned above. So, I created a wrapper class that includes a Map and PriorityBlockingQueue. I refrerenced the implementation from this link https://stackoverflow.com/a/71009712/19278017.
After that, the original producer-consumer code revised as below:
Task queue:
MergingQueue<Task> taskQueue = new PriorityMergingQueue();
Producer:
Flux<Date> dates = loadDates();
dates.filterWhen(...)
.concatMap(date -> taskManager.getTaskByDate(date))
.doOnNext(taskQueue::enqueue)
.subscribe();
Consumer:
taskProcessor.process(Flux.create((sink) -> {
sink.onRequest(n -> {
Task task;
try {
while(!sink.isCancel() && n > 0) {
if(task = taskQueue.poll(1, TimeUnit.SECOND) != null) {
sink.next(task);
n--;
}
} catch() {
....
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
I got some questions as below:
- Will that be an issue the code doing a .poll()? Since, I came across thread hang issue during the longevity testing. Just not sure if it's due to the poll() call.
- Is there any alternative solution in Reactor, which works like a PriorityBlockingQueue?