4

I'm trying to split an Observable of (String, Date) into two different Observables and zip them together as follows

import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)

val y = Observable.toReactive(x)

val fileStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._2); a._2})

fileStream.zip(dateStream)
  .map(println)
  .subscribe()

But I'm getting the following exception

monix.reactive.exceptions.MultipleSubscribersException: InputStreamObservable does not support multiple subscribers
    at monix.reactive.exceptions.MultipleSubscribersException$.build(MultipleSubscribersException.scala:51)
    at monix.reactive.internal.builders.IteratorAsObservable.unsafeSubscribeFn(IteratorAsObservable.scala:42)
    at monix.reactive.Observable$$anon$6.subscribe(Observable.scala:155)
    at monix.reactive.internal.builders.ReactiveObservable.unsafeSubscribeFn(ReactiveObservable.scala:38)
    at monix.reactive.internal.operators.MapAsyncParallelObservable.unsafeSubscribeFn(MapAsyncParallelObservable.scala:60)
    at monix.reactive.internal.builders.Zip2Observable.unsafeSubscribeFn(Zip2Observable.scala:158)
    at monix.reactive.Observable$$anon$5.unsafeSubscribeFn(Observable.scala:139)
    at monix.reactive.Observable$class.subscribe(Observable.scala:71)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:90)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:120)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
    at monix.reactive.Observable$class.subscribe(Observable.scala:112)
    at monix.reactive.Observable$$anon$5.subscribe(Observable.scala:136)
N A
  • 831
  • 2
  • 8
  • 28

2 Answers2

2

Transform to/from reactive is mandatory?

One way to fix it is to val x = Observable.fromIterable((0 to 10).map(i => (s"a $i", s"b $i"))), but it will go OutOfMemoryError for infinity streams.

Another way is to use .multicast(Pipe.publish[]) and then obs.connect() down the code:

import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).iterator)

val y = Observable.toReactive(x)
val obsY = Observable.fromReactivePublisher(y)
val connectY = obsY.multicast(Pipe.publish[(String, String)])

val fileStream = connectY.mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = connectY.mapAsync(5)(a => Task{println(a._2); a._2})

fileStream.zip(dateStream)
  .map(println)
  .subscribe()

connectY.connect()

Thread.sleep(5000)
  • I wonder what's the point of doing `toReactive` and then `fromReactivePublisher`? I.e. why not just `val connectY = x.multicast(Pipe.publish[(String, String)])`? – Sergey Romanovsky Jan 07 '21 at 23:38
0

In addition to sergei-shubin's answer, it's possible to temporarily transform an Observable into a "hot" observable that can be split into multiple streams using publishSelector without having to manually handle a multicast. This would look something like:

val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)

val zipped = x.publishSelector { o =>
  val fileStream = o.mapParallelUnordered(5)(a => Task{println(a._1); a._1})
  val dateStream = o.mapParallelUnordered(5)(a => Task{println(a._2); a._2})

  fileStream.zip(dateStream)
}

zipped
  .map(println)
  .subscribe()
Klugscheißer
  • 1,575
  • 1
  • 11
  • 24