I am trying to create an RxJava BlockingObservable
that will emit the value of a variable every X milliseconds until (condition == true) or a timeout occurs.
The code below seems close to what I want, but it always emits ONCE and then exits. What's odd is that I have a condition in takeUntil()
which will NEVER be true -- I'd expect this observable to emit continuously and eventually time out, but it doesn't.
What am I missing/doing wrong here?
Observable.fromCallable(() -> getSendWindow())
.sample(10, TimeUnit.MILLISECONDS)
.timeout(30, TimeUnit.SECONDS)
.takeUntil(sendWindow -> 1==2)
.doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
.doOnCompleted(() -> {
log.info("Send window cleared");
})
.toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());