3

I am new to RxScala Observables and am experiencing strange behaviour when using a combination of take(n) and doOnCompleted().

Below I have an example of a test where I believe the first subscribe is correct (with take(2) at the start) and outputs all the logs for the subscribe, next, completed and unsubscribe methods, however the second test (take(2) after the doOnComplete) never reaches the doOnCompleted methods.

import rx.lang.scala.Observable

object Tester extends App {

    val obs = Observable.from(List(1,2,3,4))

    val obsAddMethodsCorrect = obs.take(2)
        .doOnSubscribe( println("subscribe") )
        .doOnNext( n => println(s"next $n") )
        .doOnError( e => println("error") )
        .doOnCompleted( println("completed") )
        .doOnUnsubscribe( println("unsubscribe") )

    val obsAddMethodsInCorrect = obs
        .doOnError( e => println("error") )
        .doOnCompleted( println("completed") )
        .take(2)
        .doOnNext( n => println(s"next $n") )
        .doOnUnsubscribe( println("unsubscribe") )
        .doOnSubscribe( println("subscribe") )

    obsAddMethodsCorrect.toBlocking.subscribe()
    println("")
    println("The above seems correct. Below seems incorrect")
    println("")
    obsAddMethodsInCorrect.toBlocking.subscribe()

}

current output of the above test below.

subscribe
next 1
next 2
completed
unsubscribe

The above seems correct. Below seems incorrect

subscribe
next 1
next 2
unsubscribe

Why doesn't doOnCompleted() get fired in the second example?

Abhishek Jain
  • 3,562
  • 2
  • 26
  • 44
user499882
  • 85
  • 4

1 Answers1

1

doOnCompleted() operator will invoke its action when onCompleted() event is fired. However, when you unsubscribe from an observer before it has finished emitting items, onCompleted() does not get fired. The observer chain is effectively cancelled.

Bob Dalgleish
  • 8,167
  • 4
  • 32
  • 42
  • Sorry maybe I don't quite understand, but how does moving the position of where take is called change the result? For example, obsAddMethodsCorrect enters doOnCompleted – user499882 Apr 27 '18 at 09:36
  • There are 2 parts to the observer chain, the part before and the part after the `take()` operator. The `take()` operator will unsubscribe from the source observable before `onCompleted()` happens. Downstream, the observable created by the `take()` operator will emit the number of items followed by `onCompleted()`. The first part of the chain won't see the `onCompleted()` operation. – Bob Dalgleish Apr 27 '18 at 14:09