Reading this code AsyncSubscriber.java : The coder uses AtomicBoolean to create a Happens Before relationships, i want to know :
1_ Is it equivalent to use a synchronized block ?
it looks that the lines
if (on.get())
dosn't ensure that the block
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof OnNext<?>)
handleOnNext(((OnNext<T>)s).next);
else if (s instanceof OnSubscribe)
handleOnSubscribe(((OnSubscribe)s).subscription);
else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
handleOnError(((OnError)s).error);
else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
handleOnComplete();
}
}
will be executed by 1 thread at time.
Indeed when on.get()
return true, what prevent another thread from entering the critical section ?!
2_ Is it more efficient than a synchronized block ? (given that AtomicBoolean uses Volatile
variable )
here the part of code :
// We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
// obeying rule 2.7 and 2.11
private final AtomicBoolean on = new AtomicBoolean(false);
@SuppressWarnings("unchecked")
@Override public final void run() {
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof OnNext<?>)
handleOnNext(((OnNext<T>)s).next);
else if (s instanceof OnSubscribe)
handleOnSubscribe(((OnSubscribe)s).subscription);
else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
handleOnError(((OnError)s).error);
else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
handleOnComplete();
}
} finally {
on.set(false); // establishes a happens-before relationship with the beginning of the next run
if(!inboundSignals.isEmpty()) // If we still have signals to process
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
}
}
}
// What `signal` does is that it sends signals to the `Subscription` asynchronously
private void signal(final Signal signal) {
if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
}
// This method makes sure that this `Subscriber` is only executing on one Thread at a time
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
if (!done) {
try {
done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
} finally {
inboundSignals.clear(); // We're not going to need these anymore
// This subscription is cancelled by now, but letting the Subscriber become schedulable again means
// that we can drain the inboundSignals queue if anything arrives after clearing
on.set(false);
}
}
}
}
3_ Is it safe?
4_ Is it commonly used for this purpose (Creating a Happens Before Relationship) ?