Your base problem is how to convert an Observable[ByteBuffer]
into an Observable[String]
, where each String
is a line, correct?
You can use the method bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]
.
This method will buffer the Observable until the selector Observable emits an element.
I made a small example using Int
s:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes
buffered.foreach(println)
Try it out!
Of course, this has a major drawback: The underlying Observable source
will be evaluated twice. You can see this by modifying the above example:
// Start writing your ScalaFiddle code here
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x} // <------------------
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
This will print every number twice.
To fix this, you have to convert the source
Observable into a hot Observable:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x}
.publish // <-----------------------------
// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
source.connect() // <---------------------------
Try it out!
The only thing you need to do is to modify the selector to emit items only
when a line feed is encountered.
I would suggest splitting the Observable[ByteBuffer]
into an Observable[Byte]
first (using flatMap
) to avoid headaches.