5

I have some code that uses Monix Observable for stream processing of a file. To test this code, I'd like for the operations I do on the Observable to be type independent so I can also perform them on any other data structure like List. That why I have written the following code to abstract the underlying data structure:

def permutations[F[_] : Applicative : FunctorFilter : SemigroupK](chars: F[Char]): F[F[Char]] = {
  Range.inclusive('a', 'z').map(_.toChar)
    .map { c ⇒
      FunctorFilter[F].filter(chars)(Character.toLowerCase _ andThen (_ != c))
    }
    .map(Applicative[F].pure)
    .reduceLeft(SemigroupK[F].combineK)
}

The thing that bugs me, is that this code creates a lot of intermediary data-structures. Is there a type-class I could use that makes this process more efficient? Something that lifts one data-structure into another without too much overhead, like LiftIO but for collections of items?

Martijn
  • 2,268
  • 3
  • 25
  • 51
  • 2
    May I ask why you don't use `Observable.fromIterable(...)` for testing? Would this not mitigate any performance problems your current approach might have? – Markus Appel Dec 05 '18 at 10:05
  • @MarkusAppel It probably would but that would also mean that my test code has to start making use of these effects types like `Observable` and `Task` which is what I'm trying to avoid. Using an `Observable` in production is necessary because of the sheer volume of data, but in testing only a few cases are necessary for which a `List` suffices. – Martijn Dec 05 '18 at 10:10
  • 3
    Fair point. Just be aware that this kind of generic programming will always include a performance hit. – Markus Appel Dec 05 '18 at 10:20
  • By _intermediary structures_ do you mean the implicit instances that you need (`Applicative`, `FunctorFilter`, etc) or the ones created in-between the different transformations? If the latter, using 2 consecutive `map` operations will always do 2 passes on `List` (monix and others are able to optimise it). In the other hand, it looks like your _unfolding_ a collection to _fold_ it back again. You may be interested on looking at **Recursion Schemes** – Alonso Dominguez Dec 05 '18 at 12:01
  • @AlonsoDominguez The latter. I’m concerned about all the single element `List`s that will created and combined. That could add a lot of overhead. – Martijn Dec 05 '18 at 12:09
  • mmm, but similarly that code will create many single-element `Observable`, granted the overhead will be in the `combineK` operation since the one in `List` will defo be less efficient... you can improve that by changing `reduceLeft` for `reduceRight` since `List` concatenation has a O(n), where `n` is the size of the `List` in the left hand side... but after all, this more an inconvenience of the data structure you have chosen. You could also benchmark it agains a `Vector`... – Alonso Dominguez Dec 05 '18 at 12:25

2 Answers2

1

It does not look like cats has anything to offer for this. And monix is not better, it only implements a handful of typeclasses from cats.

So, my best guess would be defining such typeclasses yourself:

import monix.execution.Scheduler.Implicits.global
import cats._
import cats.implicits._
import monix.reactive._

object Test {

  def main(args: Array[String]): Unit = {

    println(permutations(List('a', 'b', 'c')))

    permutations(Observable('a', 'b', 'c')).foreach{c =>
      print("Observable(")
      c.foreach(c1 => print(c1 + " "))
      print(") ")
    }
  }

  def permutations[F[_] : Applicative](chars: F[Char])(implicit seq: Sequence[F], fil: Filter[F]): F[F[Char]] = {

    val abc = seq.fromIterable(
      Range.inclusive('a', 'z').map(_.toChar)
    )

    abc.map(c => fil.filter(chars)(_ != c))
  }

  trait Sequence[F[_]] {

    def fromIterable[A](f: Iterable[A]): F[A]
  }

  implicit val listSequence: Sequence[List] = new Sequence[List] {

    def fromIterable[A](f: Iterable[A]): List[A] = f.toList
  }

  implicit val observableSequence: Sequence[Observable] = new Sequence[Observable] {

    def fromIterable[A](f: Iterable[A]): Observable[A] = Observable.fromIterable(f)
  }

  trait Filter[F[_]] {

    def filter[A](fa: F[A])(f: A => Boolean): F[A]
  }

  implicit val observableFilterFunctor: Filter[Observable] = new Filter[Observable] {

    def filter[A](fa: Observable[A])(f: A => Boolean): Observable[A] =
      fa.filter(f)
  }

  implicit val listFilterFunctor: Filter[List] = new Filter[List] {

    def filter[A](fa: List[A])(f: A => Boolean): List[A] =
      fa.filter(f)
  }

}

Result:

List(List(b, c), List(a, c), List(a, b), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c))
Observable(b c ) Observable(a c ) Observable(a b ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) 

I sadly couldn't get this to work on scalafiddle or scastie, because both don't offer the correct cats (1.5.0) and monix (3.0.0-M3) versions.

I still hope this helps.

Markus Appel
  • 3,138
  • 1
  • 17
  • 46
0

Although, creating re-useable functions are useful, you can easily test Observable without doing that.

I would recommend to split the logic processing and the side effectful consumer

object StreamProcessing {
  def processItems(obs: Observable[Input]): Observable[Result] = ???
}

In prod, you would do

val eventsStream: Observable[Input] = ???
val eventsConsumer: Consumer[Input, Output] = ???

StreamProcessing(myEventsStream).consumeWith(eventsConsumer)

Then, in your test, you can just mock your test data, asserting the list result. Also, by testing Observable, you gain the ability to control time with TestScheduler, which makes testing a breeze.

implicit val sc = TestScheduler()

val testData: List[Input] = ???
val expected: List[Output] = ???

val res = StreamProcessing(Observable.fromIterable(testData))
  .toListL
  .runToFuture

sc.tick()

assert(res.value, Some(Success(expected))

atl
  • 326
  • 1
  • 9