2

Given xs: Seq[Source[A, Mat]] and a function to combine individual materializers into a single one, is it possible to merge xs into a single aggregate source which materializes into an aggregate Mat?

Consider this practical example:

Having N Kafka topics represented by N Sources of type Source[A, Consumer.Control], I want to combine (merge) them into the same Source[A, Consumer.Control] such that the resulting Consumer.Control handles all the original controls.

For 2 sources this is trivial. Having this:

class ConsumerControlBoth(c1: Consumer.Control,
                          c2: Consumer.Control)
                         (implicit ec: ExecutionContext) extends Consumer.Control {
  override def stop(): Future[Done] = {
    (c1.stop().zip(c2.stop())).map(_ => Done)
  }

  override def shutdown(): Future[Done] = {
    (c1.shutdown().zip(c2.shutdown())).map(_ => Done)
  }

  override def isShutdown: Future[Done] = {
    (c1.isShutdown.zip(c2.isShutdown)).map(_ => Done)
  }
}

I can do this:

Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))

It's tempting to just go with

sources.foldLeft(sources.head) { case (acc, src) =>
  Source.combineMat(acc, src)(Merge(_))(new ConsumerControlBoth(_, _))
}

However, I'm worried that since each combineMat is attempting to evenly distribute element fetching from its 2 inputs, this might result in having an uneven distribution in the end: fetching element from the last source would have probability 1/2, fetching from second-to-last - 1/4 etc.

On the other hand, there are vararg methods for combining sources without consideration for materialized values, e.g. Source.combine, which has the type Source[A, NotUsed]. I haven't been able to figure out how to adapt that to use combined materialized value.

Am I right in this assumption or the result would be uniform in that sense? How do I do this properly in a generic case?

UPD. I've just come up with this (just a POC, no sanity checks etc.):

def merge[A, M](xs: List[Source[A, M]])
                (implicit
                 mat: Materializer,
                 M: Monoid[M]): Source[A, M] = {
  require(xs.lengthCompare(2) >= 0, "works for sources > 2")

  val seed: (M, List[Source[A, NotUsed]]) = (M.empty, List.empty[Source[A, NotUsed]])
  val (mat, sourcesRev) = xs.foldLeft(seed) { case ((ma, sa), s) =>
    val (mMat, mSrc) = s.preMaterialize()
    (M.combine(ma, mMat), mSrc :: sa)
  }

  val sources: List[Source[A, NotUsed]] = sourcesRev.reverse

  Source
    .combine(sources(0), sources(1), sources.drop(2): _*)(Merge(_))
    .mapMaterializedValue(_ => mat)
}

Doesn't look to have the downsides mentioned above, but I'm not sure I like it. Any comments?

tkroman
  • 4,811
  • 1
  • 26
  • 46

1 Answers1

0

It is possible to combine arbitrary number of sources (type of materialized value should be the same though) like this:

import scalaz.{Ordering => _, _}    

def mergeWithPicker[A](originSources: Seq[Source[Partition, A]])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    merge(originSources, picker[A])

 def mergeWithSorter[A](originSources: Seq[Source[Partition, A]])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    merge(originSources, sorter[A])

private def merge[A](originSources: Seq[Source[Partition, A]], f: (Source[Partition, A], Source[Partition, A]) => Source[Partition, A])(implicit monoid: Monoid[A]): Source[Partition, A] = originSources match {
    case Nil     =>
      Source.empty[Partition].mapMaterializedValue(_ => monoid.zero)

    case sources =>
      @tailrec
      def reducePairs(sources: Seq[Source[Partition, A]]): Source[Partition, A] =
        sources match {
          case Seq(s) =>
            s

          case _      =>
            reducePairs(sources.grouped(2).map {
              case Seq(a)    => a
              case Seq(a, b) => f(a, b)
            }.toSeq)
        }

      reducePairs(sources)
    }

  private def picker[A](s1: Source[Partition, A], s2: Source[Partition, A])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    combineSources(new PartitionPicker[Partition], s1, s2)(monoid.append(_, _))

  private def sorter[A](s1: Source[Partition, A], s2: Source[Partition, A])(implicit monoid: Monoid[A], ord: Ordering[Partition]): Source[Partition, A] =
    combineSources(new MergeSorted[Partition], s1, s2)(monoid.append(_, _))

  private def combineSources[A, MatIn0, MatIn1, Mat](combinator: GraphStage[FanInShape2[A, A, A]], s0: Source[A, MatIn0], s1: Source[A, MatIn1])(combineMat: (MatIn0, MatIn1) => Mat): Source[A, Mat] =
    Source.fromGraph(GraphDSL.create(s0, s1)(combineMat) { implicit builder => (s0, s1) =>
      val merge = builder.add(combinator)
      s0 ~> merge.in0
      s1 ~> merge.in1
      SourceShape(merge.out)
    })

Later you can provide implicit Monoid which describe how to merge materialized values:

import akka.NotUsed

import scalaz.Monoid

object Monoids {

  implicit final val notUsedMonoid: Monoid[NotUsed] = new Monoid[NotUsed] {
    def zero: NotUsed = NotUsed

    def append(f1: NotUsed, f2: => NotUsed): NotUsed = f1
  }

  implicit def setMonoid[A]: Monoid[Set[A]] = new Monoid[Set[A]] {
    override def zero: Set[A] = Set.empty

    override def append(f1: Set[A], f2: => Set[A]): Set[A] = f1 ++ f2
  }

}

You also probably would be interested to watch this issue: https://github.com/akka/akka/issues/24369

llirik
  • 148
  • 7
  • This has the same downsides I mentioned in the question: non-uniformity of source selection. E.g. try merging `Source(List(0,0,0)), Source(List(1,1,1)), Source(List(2,2,2))` and feeding them to `Sink.toSeq`. This would result in `Seq(0,1,0,1,0,1,2,2,2)`. OTOH merging (w/o materializer) using Source.combine results in `Seq(0,1,2,0,1,2,0,1,2)`, which is what I want. – tkroman Feb 24 '18 at 23:13
  • In my example merge takes also ordering, so output would be Seq(2,2,2,1,1,1,0,0,0). If you want round robin selection you would probably need to have custom graphs stage similar to merge sort which actually only switch which input it takes left/right. – llirik Feb 25 '18 at 19:16