1

I am trying to create an RxJava BlockingObservable that will emit the value of a variable every X milliseconds until (condition == true) or a timeout occurs.

The code below seems close to what I want, but it always emits ONCE and then exits. What's odd is that I have a condition in takeUntil() which will NEVER be true -- I'd expect this observable to emit continuously and eventually time out, but it doesn't.

What am I missing/doing wrong here?

Observable.fromCallable(() -> getSendWindow())
        .sample(10, TimeUnit.MILLISECONDS)
        .timeout(30, TimeUnit.SECONDS)
        .takeUntil(sendWindow -> 1==2)
        .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
        .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
        .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());
Bertrand Martel
  • 42,756
  • 16
  • 135
  • 159
bitstream
  • 459
  • 9
  • 20
  • .sample does not do what you think it does. Sample rate limits the above Observable to (at most) once every 10 seconds. – Aron Dec 22 '16 at 02:51
  • According to the documentation: "The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling." http://www.introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample Does that not imply that it's going to "sample" at a given interval and emit what it sees at that time? If not, what's the correct approach? – bitstream Dec 22 '16 at 02:54
  • 1
    That is correct. However, during the sampling window, no item has been emitted. Therefore there is no "most recently emitted" item. – Aron Dec 22 '16 at 02:57
  • 1
    Possible duplicate of [Rx Observable emitting values periodically](http://stackoverflow.com/questions/24557153/rx-observable-emitting-values-periodically) – Yaroslav Stavnichiy Jan 11 '17 at 15:05

1 Answers1

2

.sample does not do what you think it does. Sample rate limits the above Observable to (at most) once every 10 seconds.

Observable.fromCallable() only emits an event once, then completes.

.sample() waits 10 seconds and emits the last event (if there is one), every 10 seconds. Therefore it only emits one event, when you attach it to an Observable that only has one event. Then it completes.

What you probably actually want (I'm a .net programmer, so excuse my casing etc) is this.

Edit: Thanks for @akanokd for telling me that java uses interval for repeated events.

Observable.interval(10, timeUnit.MILLISECONDS)
    .map(x -> getSendWindow())
    .takeUntil(sendWindow -> 1==2)
    .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
     .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
    .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());

Feel free to edit this answer with the API calls to the JAVA specific version...

Aron
  • 15,464
  • 3
  • 31
  • 64
  • I made the changes for Java.. mainly just changing select() method to fromCallable() since select() doesn't exist in RxJava2. I re-added timeout() but the code still emits a single value and then exits. – bitstream Dec 22 '16 at 03:25
  • @bitstream Its been a long time since I've used Java, I actually think the correct name for "select" is "map" in java. – Aron Dec 22 '16 at 03:45
  • 1
    RxJava's `timer()` is one-shot only, you want `interval()`. – akarnokd Dec 22 '16 at 08:35
  • Awesome! using interval() and map() did the trick. Thank you so much! – bitstream Dec 22 '16 at 19:58