I have an 'outer' Observable and a cache of 'inner' observables. Which 'inner' observable I select depends on the output of the 'outer' observable. When any of them change I want to combine their results. If 'outer' or 'inner' changes I want to combine the new values. I am trying to do this with flatMap but running into a problem of too many events being triggered (i.e. duplicates).
fun test() {
val outerObs = Observable.interval(3, TimeUnit.SECONDS)
val innerObsCache = mapOf(
0L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "zero:$it" }.share(),
1L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "one:$it" }.share()
)
outerObs
.flatMap { outerVal ->
innerObsCache[outerVal % 2]!!.map { "Outer($outerVal):Inner($it)" }
}.subscribe({ combinedResult -> println("$now $combinedResult") })
}
private val now get() = SimpleDateFormat("HH:mm:ss").format(Date())
This outputs something like
00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)
00:17:39 Outer(0):Inner(zero:3)
You will notice that I am getting two outputs at 00:17:39
. What I really want to happen is output like this
00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)
It seems like my problem is the inner observable at index 0 of my cache Observable map is still vending and thus causing the additional value to be sent. I don't see how to not have that happen given no observables are actually completing when I want to switch to the other one. As time goes on the problem gets worse because each outerObs vend causes even more undesired duplicates.
I am sure there is some other RX technique I should be using here to get what I expect but could use some guidance.