Given xs: Seq[Source[A, Mat]]
and a function to combine individual materializers into a single one, is it possible to merge xs
into a single aggregate source which materializes into an aggregate Mat
?
Consider this practical example:
Having N
Kafka topics represented by N
Source
s of type Source[A, Consumer.Control]
, I want to combine (merge) them into the same Source[A, Consumer.Control]
such that the resulting Consumer.Control
handles all the original controls.
For 2 sources this is trivial. Having this:
class ConsumerControlBoth(c1: Consumer.Control,
c2: Consumer.Control)
(implicit ec: ExecutionContext) extends Consumer.Control {
override def stop(): Future[Done] = {
(c1.stop().zip(c2.stop())).map(_ => Done)
}
override def shutdown(): Future[Done] = {
(c1.shutdown().zip(c2.shutdown())).map(_ => Done)
}
override def isShutdown: Future[Done] = {
(c1.isShutdown.zip(c2.isShutdown)).map(_ => Done)
}
}
I can do this:
Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))
It's tempting to just go with
sources.foldLeft(sources.head) { case (acc, src) =>
Source.combineMat(acc, src)(Merge(_))(new ConsumerControlBoth(_, _))
}
However, I'm worried that since each combineMat
is attempting to evenly distribute element fetching from its 2 inputs, this might result in having an uneven distribution in the end: fetching element from the last source would have probability 1/2
, fetching from second-to-last - 1/4
etc.
On the other hand, there are vararg methods for combining sources without consideration for materialized values, e.g. Source.combine
, which has the type Source[A, NotUsed]
. I haven't been able to figure out how to adapt that to use combined materialized value.
Am I right in this assumption or the result would be uniform in that sense? How do I do this properly in a generic case?
UPD. I've just come up with this (just a POC, no sanity checks etc.):
def merge[A, M](xs: List[Source[A, M]])
(implicit
mat: Materializer,
M: Monoid[M]): Source[A, M] = {
require(xs.lengthCompare(2) >= 0, "works for sources > 2")
val seed: (M, List[Source[A, NotUsed]]) = (M.empty, List.empty[Source[A, NotUsed]])
val (mat, sourcesRev) = xs.foldLeft(seed) { case ((ma, sa), s) =>
val (mMat, mSrc) = s.preMaterialize()
(M.combine(ma, mMat), mSrc :: sa)
}
val sources: List[Source[A, NotUsed]] = sourcesRev.reverse
Source
.combine(sources(0), sources(1), sources.drop(2): _*)(Merge(_))
.mapMaterializedValue(_ => mat)
}
Doesn't look to have the downsides mentioned above, but I'm not sure I like it. Any comments?