21

I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value). If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers).

I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers. I'd appreciate any pointers to the right direction.

Guillaume Massé
  • 8,004
  • 8
  • 44
  • 57
vicaba
  • 2,836
  • 1
  • 27
  • 45
  • 3
    I appreciate you asking this. It's such a simple request, yet finding meaningful answer with sample code seems elaborate. This is the only one I found! – akauppi Nov 06 '16 at 15:03

2 Answers2

30

Maybe something like statefulMapConcat can help you:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) {
    //save value to DB
    println(identValue.value)
    List(identValue)
  } else {
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  }
}

Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess { case identValue => println(identValue) }
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
rincewind
  • 623
  • 6
  • 13
  • Thanks for the code. I would appreciate a bit more types in the middle, since there's a () => ... factory involved. Would you know why there isn't a `.statefulMap` method? – akauppi Nov 06 '16 at 15:11
  • 1
    My questions 1,5 years back look childish, now. Let me answer them. The factory way does not complicate anything. It just means the code may get called multiple times. Trivial. There's no ".statefulMap" since the job of the code is to provide 0..n entries per incoming entry (the `List`s), and obviously those entries get concatenated. Either I had a bad day in '16, or I've learned something since then. – akauppi Jul 20 '18 at 18:09
  • 1
    Wow, `statefulMapConcat` seems insanely versatile – ig-dev Apr 06 '19 at 21:38
3

A few years later, here is an implementation I wrote if you only need a 1-to-1 mapping (not 1-to-N):

import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

object StatefulMap {
  def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}

class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
  val in = Inlet[T]("StatefulMap.in")
  val out = Outlet[O]("StatefulMap.out")
  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    val f = converter
    setHandler(in, () => push(out, f(grab(in))))
    setHandler(out, () => pull(in))
  }
}

Test (and demo):

  behavior of "StatefulMap"

  class Counter extends (Any => Int) {
    var count = 0

    override def apply(x: Any): Int = {
      count += 1
      count
    }
  }

  it should "not share state among substreams" in {
    val result = await {
      Source(0 until 10)
        .groupBy(2, _ % 2)
        .via(StatefulMap(new Counter()))
        .fold(Seq.empty[Int])(_ :+ _)
        .mergeSubstreams
        .runWith(Sink.seq)
    }
    result.foreach(_ should be(1 to 5))
  }
ig-dev
  • 489
  • 1
  • 5
  • 15