4

I'm using sttp client. I want to intepret response as strings divided by lines, eg Observable[String]

Here sttp streaming api:

import java.nio.ByteBuffer

import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable

implicit val sttpBackend = OkHttpMonixBackend()

val res: Task[Response[Observable[ByteBuffer]]] = sttp
  .post(uri"someUri")
  .response(asStream[Observable[ByteBuffer]])
  .send()

So how I can get Observable[String]?

Here some ideas:

1. Is there a simple way to split Observable by lines?
2. Or maybe I can get raw InputStream from response, so I can easly split it, but I can't find way to use something like asStream[InputStream]
3. Or maybe just use http backend witout sttp layer?

zella
  • 4,645
  • 6
  • 35
  • 60
  • hey @zella! Have you been able to solve the problem? – Ernest Sadykov Jun 21 '19 at 13:12
  • 1
    @ErnestSadykov No, I didn't worked with sttp last time. But for example you can see here https://github.com/zella/rx-process2/blob/master/src/test/scala/com/github/zella/rxprocess2/RxNuProcessBuilderSpec.scala#L95 How to decode `Observable[Array[Bytes]]` to strings. rxjava `Flowable` easy converts to monix. – zella Jun 21 '19 at 14:35
  • 1
    Note, that `Strings` from link above has methods to split `Observable[String]` byLine. Example: https://github.com/zella/tuapse-play/blob/master/src/main/java/org/zella/tuapse/play/torrent/impl/DefaultWebTorrent.java#L41 – zella Jun 21 '19 at 15:35

1 Answers1

2

Your base problem is how to convert an Observable[ByteBuffer] into an Observable[String], where each String is a line, correct?

You can use the method bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]. This method will buffer the Observable until the selector Observable emits an element.

I made a small example using Ints:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes

buffered.foreach(println)

Try it out!


Of course, this has a major drawback: The underlying Observable source will be evaluated twice. You can see this by modifying the above example:

// Start writing your ScalaFiddle code here

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}  // <------------------

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

This will print every number twice.


To fix this, you have to convert the source Observable into a hot Observable:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}
  .publish // <-----------------------------

// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

source.connect() // <---------------------------

Try it out!

The only thing you need to do is to modify the selector to emit items only when a line feed is encountered.

I would suggest splitting the Observable[ByteBuffer] into an Observable[Byte] first (using flatMap) to avoid headaches.

Markus Appel
  • 3,138
  • 1
  • 17
  • 46