So I know this has been asked many times before, but I have tried many things and nothing seems to work.
Let's start with these blogs/articles/code:
- https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
- https://jimbaca.com/rxjava-retrywhen/
- http://blog.inching.org/RxJava/2016-12-12-rx-java-error-handling.html
- https://pamartinezandres.com/rxjava-2-exponential-backoff-retry-only-when-internet-is-available-5a46188ab175
- https://gist.github.com/wotomas/35006d156a16345349a2e4c8e159e122
And many others.
In a nutshell all of them describe how you can use retryWhen to implement exponential back-off. Something like this:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
Even the documentation in the library agrees with it: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.
However, I've tried this and some pretty similar variations, not worthy to describe here, and nothing seems to work. There's a way in that the examples works and is using blocking subscribers but I want to avoid blocking threads.
So if to the previous observable we apply a blocking subscriber like this:
.blockingForEach(System.out::println);
It works as expected. But as that's not the idea. If we try:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
The flow runs only once, thus not what I want to achieve.
Does that mean it cannot be used as I'm trying to? From the documentation it doesn't seem to be a problem trying to accomplish my requirement.
Any idea what am I missing?
TIA.
Edit: There are 2 ways I'm testing this:
A test method (using testng):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
From a Kafka consumer (using Spring boot):
This is only the subscription to the observer, but the retries logic is what I described earlier in the post.
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}