4

So I know this has been asked many times before, but I have tried many things and nothing seems to work.

Let's start with these blogs/articles/code:

And many others.

In a nutshell all of them describe how you can use retryWhen to implement exponential back-off. Something like this:

source
.retryWhen(
  errors -> {
    return errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(
       retryCount -> {
       System.out.println("retry count " + retryCount);
       return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
     });
 })

Even the documentation in the library agrees with it: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.

However, I've tried this and some pretty similar variations, not worthy to describe here, and nothing seems to work. There's a way in that the examples works and is using blocking subscribers but I want to avoid blocking threads.

So if to the previous observable we apply a blocking subscriber like this:

.blockingForEach(System.out::println);

It works as expected. But as that's not the idea. If we try:

.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

The flow runs only once, thus not what I want to achieve.

Does that mean it cannot be used as I'm trying to? From the documentation it doesn't seem to be a problem trying to accomplish my requirement.

Any idea what am I missing?

TIA.

Edit: There are 2 ways I'm testing this:

A test method (using testng):

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
              System.out.println("trying again");
              return Integer.parseInt(x);
            });
source
    .retryWhen(
        errors -> {
          return errors
              .zipWith(Observable.range(1, 3), (n, i) -> i)
              .flatMap(
                  retryCount -> {
                    return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
                  });
        })
    .subscribe(...);

From a Kafka consumer (using Spring boot):

This is only the subscription to the observer, but the retries logic is what I described earlier in the post.

@KafkaListener(topics = "${kafka.config.topic}")
      public void receive(String payload) {
        log.info("received payload='{}'", payload);
        service
            .updateMessage(payload)
            .subscribe(...)
            .dispose();
      }
Yuliban
  • 143
  • 7
  • Please format the code, it's hard to read. – Boris Feb 13 '20 at 15:10
  • 1
    Are you running this from a main method? RxJava uses daemon threads and if your main thread quits while there is background work going on, those threads will be terminated by the JVM. That's why examples use `blocking` methods. – akarnokd Feb 13 '20 at 19:37
  • I've updated the post to explain how I am calling the method. – Yuliban Feb 13 '20 at 19:56
  • you're facing the same problem as here https://stackoverflow.com/questions/59005142/why-is-there-no-information-output-from-the-console-after-two-seconds-rxjava/59009512#59009512 – bubbles Feb 14 '20 at 07:49

1 Answers1

0

The main issue of your code is that Observable.timer is by default operating on the computation scheduler. This adds extra effort when trying to verify the behaviour within a test.

Here is some unit testing code that verifies that your retry code is actually retrying.

  • It adds a counter, just so we can easily check how many calls have happened.
  • It uses the TestScheduler instead of the computation scheduler so that we can pretend moving in time through advanceTimeBy.

    TestScheduler testScheduler = new TestScheduler();
    AtomicInteger counter = new AtomicInteger();
    
    Observable<Integer> source =
        Observable.just("test")
            .map(
                x -> {
                    System.out.println("trying again");
                    counter.getAndIncrement();
                    return Integer.parseInt(x);
                });
    TestObserver<Integer> testObserver = source
        .retryWhen(
            errors -> {
                return errors
                    .zipWith(Observable.range(1, 3), (n, i) -> i)
                    .flatMap(
                        retryCount -> {
                            return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
                        });
            })
        .test();
    
    assertEquals(1, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(2, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(3, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(4, counter.get());
    
    testObserver.assertComplete();
    
cdehning
  • 245
  • 1
  • 4