6

I'm trying to transform a Source of Scala entities into a Source of ByteString via Alpakka's CsvFormatting and count number of elements in the initial stream. Could you suggest the best way to count the initialSource elements and keep the result as a ByteString Source:

val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
  .map(e => List(e.firstName, e.lastName, e.city))
  .via(CsvFormatting.format())
noname.404
  • 335
  • 3
  • 11

1 Answers1

15

To count the elements in a stream, one must run the stream. One approach is to broadcast the stream elements to two sinks: one sink is the result of the main processing, the other sink simply counts the number of elements. Here is a simple example, which uses a graph to obtain the materialized values of both sinks:

val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)

val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
  (s1, s2) =>
  import GraphDSL.Implicits._

  val broadcast = builder.add(Broadcast[ByteString](2))

  val source: Source[ByteString, NotUsed] =
    Source(1 to 10)
      .map(i => List(i.toString))
      .via(CsvFormatting.format())

  source ~> broadcast.in
  broadcast.out(0) ~> s1
  broadcast.out(1) ~> s2
  ClosedShape
}) // RunnableGraph[(Future[Done], Future[Int])]

val (fut1, fut2) = g.run()

fut2 onComplete {
  case Success(count) => println(s"Number of elements: $count")
  case Failure(_) =>
}

In the above example, the first sink just prints the stream elements and has a materialized value of type Future[Done]. The second sink does a fold operation to count the stream elements and has a materialized value of type Future[Int]. The following is printed:

ByteString(49, 13, 10)
ByteString(50, 13, 10)
ByteString(51, 13, 10)
ByteString(52, 13, 10)
ByteString(53, 13, 10)
ByteString(54, 13, 10)
ByteString(55, 13, 10)
ByteString(56, 13, 10)
ByteString(57, 13, 10)
ByteString(49, 48, 13, 10)
Number of elements: 10

Another option for sending stream elements to two different sinks, while retaining their respective materialized values, is to use alsoToMat:

val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)

val (fut1, fut2) = Source(1 to 10)
  .map(i => List(i.toString))
  .via(CsvFormatting.format())
  .alsoToMat(sink1)(Keep.right)
  .toMat(sink2)(Keep.both)
  .run() // (Future[Done], Future[Int])

fut2 onComplete {
  case Success(count) => println(s"Number of elements: $count")
  case Failure(_) =>
}

This produces the same result as the graph example described earlier.

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54