To count the elements in a stream, one must run the stream. One approach is to broadcast the stream elements to two sinks: one sink is the result of the main processing, the other sink simply counts the number of elements. Here is a simple example, which uses a graph to obtain the materialized values of both sinks:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
val source: Source[ByteString, NotUsed] =
Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}) // RunnableGraph[(Future[Done], Future[Int])]
val (fut1, fut2) = g.run()
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
In the above example, the first sink just prints the stream elements and has a materialized value of type Future[Done]
. The second sink does a fold operation to count the stream elements and has a materialized value of type Future[Int]
. The following is printed:
ByteString(49, 13, 10)
ByteString(50, 13, 10)
ByteString(51, 13, 10)
ByteString(52, 13, 10)
ByteString(53, 13, 10)
ByteString(54, 13, 10)
ByteString(55, 13, 10)
ByteString(56, 13, 10)
ByteString(57, 13, 10)
ByteString(49, 48, 13, 10)
Number of elements: 10
Another option for sending stream elements to two different sinks, while retaining their respective materialized values, is to use alsoToMat
:
val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)
val (fut1, fut2) = Source(1 to 10)
.map(i => List(i.toString))
.via(CsvFormatting.format())
.alsoToMat(sink1)(Keep.right)
.toMat(sink2)(Keep.both)
.run() // (Future[Done], Future[Int])
fut2 onComplete {
case Success(count) => println(s"Number of elements: $count")
case Failure(_) =>
}
This produces the same result as the graph example described earlier.