3

I have run into a behavior of Scala Observables that has surprised me. Consider my example below:

object ObservablesDemo extends App {

  val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
  val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
  val oBoth = (oFast merge oSlow).take(8)

  oBoth.subscribe(println(_))
  oBoth.toBlocking.toIterable.last

}

The code demonstrates emitting elements from two observables. One of them emits its elements in a "slow" way (every 7 seconds), the other in a "fast" way (every 3 seconds). For the sake of the question assume we want to define those observables with the use of the map function and map the numbers from the interval appropriately as seen above (as opposed to another possible approach which would be emitting items at the same rate from both observables and then filtering out as needed).

The output of the code seems counterintuitive to me:

[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9   <-- HERE
[SLOW] 7   <-- HERE
[FAST] 12
[FAST] 15

The problematic part is when the [FAST] observable emits 9 before the [SLOW] observable emits 7. I would expect 7 to be emitted before 9 as whatever is emitted on the seventh second should come before what is emitted on the ninth second.

How should I modify the code to achieve the intended behavior? I have looked into the RxScala documentation and have started my search with topics such as the different interval functions and the Scheduler classes but I'm not sure if it's the right place to search for the answer.

user24139
  • 43
  • 2
  • 1
    I would suggest using something like **fs2**, **Monix**, **AkkaStreams** or **ZIO** over **RxScala**. There is also the library that uses **Laminar** under the hood _(**outwatch** I believe is called)_. – Luis Miguel Mejía Suárez Jun 21 '21 at 18:27
  • The thing is that your fast observable is starting at `3rd` second and slow starts at `7th` second. So, `9` is emited at by fast at`12th` second where as `7` is emitted by slow at `14th` second. – sarveshseri Jun 21 '21 at 18:28
  • 3
    You need to use `Observable.interval(0.seconds, 3.seconds)` and `Observable.interval(0.seconds, 7.seconds)` to start your observables at `0th` second. – sarveshseri Jun 21 '21 at 18:37

1 Answers1

2

That looks like the way it should work. Here it is listing out the seconds and the events. You can verify with TestObserver and TestScheduler if that is available in RXScala. RXScala was EOL in 2019, so keep that in mind too.

Secs   Event
-----------------
1
2
3      [Fast] 0
4
5
6      [Fast] 3
7      [Slow] 0
8
9      [Fast] 6
10
11
12     [Fast] 9
13
14     [Slow] 7
15     [Fast] 12
16
17
18     [Fast] 15
19
20
21     [Fast] 18
Daniel Hinojosa
  • 972
  • 5
  • 9