Part 1. Observe vs. Subscribe
Looking into the question, I see the need to observe elements after execution on a particular thread.
To be precise, observe in this context means *being able to work on a value in the stream on some specific thread. In RxJava, we have a proper operator called precisely like that, but in Project Reactor, we call identical operation as publishOn
.
Thus,
* if you want to process data * on Schedulers.boundedElastic()
then you should use the following construction
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
BUT Wait, .subscribeOn
worked as well???
Reading the previous construction, you may start worrying because you are 100% sure that
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
Sends onNext
on the thread boundedElastic-1
, so what is wrong with the same fromFuture
.
and here comes a trick:
Never use subscribeOn
with Futures
/ CompletableFuture
or anything which can use own async mechanism underneath
If we look at what is going on behind subscribeOn
, you will find out something like the following:
// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
Which basically means parent's subscribe
method will be called on a separate thread.
Such a technique works for fromRunnable
, fromSupplier
, fromCallable
because their logic happens in the subscribe
method:
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
which means it is almost equal to
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
In contrast, fromFuture
works much trickier.
A short quiz.
On which thread we may observe a value? (assume execution happens on thread Main, and the task is executing on ForkJoinPool)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
And the correct answer....
It may be Thread Main
or it may be Thread from ForkJoinPool
...
because it is racy... and at the point, we consume value, the value may be already delivered, so we just read volatile
field on the reader thread (thread Main), otherwise, thread Main just going to set an acceptor
so the acceptor will be invoked later on the ForkJoinPool
thread.
Right, that is why when you use fromFuture
with subscribeOn
, there is no guarantee that the subscribeOn
thread will observe the value of the given CompletableFuture
.
That is why publishOn
is the only way to ensure value processing is happening on the desired Thread.
Alright, should I use publishOn
all the way down???
And yes and no. It depends.
If you use Mono
- in 99% of the cases, you may use publishOn
if you want to make sure that your data processing is happening on a particular thread - always use publishOn
.
Do not worry about a potential overhead, Project Reactor takes care of you even if you used it accidentally. Project Reactor has several optimization which may replace your publishOn
with subscribeOn
(if it is safe without breaking the behavior) at runtime so you will get the best.
Part 2. Falling down the rabbit hole of Scheduelr
s
Never ever use Schedulers.immediate()
it is almost no-ops scheduler which basically does
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
Right, it does nothing useful for reactor users, and we use it only for internal needs.
Alright, so how then I can use Scheduler to use it in an imperative world as executor
There are two options:
Fast path: Step by Step guide
1.a) Create your bounded Executor
. (e.g. Executors.fixed...
)
1.b) Create your bounded ScheduledExecutorService
if you want to get the power of periodic task and delayed tasks
2) Create a Scheduler
from your executor using Schedulers.fromExecutorXXX
API
3) Use your bounded Executor
in the imperative world, use your Scheduler
which is a wrapper around the bounded one for the reactive world
Long path
Coming soon...
Part 3. How to serialize executions.
Coming soon