3

I want to create custom StatefulStage which should works like groupBy method and emit Source[A, Unit] elements but I don't understand how to create instance of Source[A, Unit] and push incoming element to it. Here is stub:

class GroupBy[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
  override def initial: StageState[A, Source[A, Unit]] = new StageState[A, Source[A, Unit]] {
    override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
      val source: Source[A, Unit] = ... // Need to create source here

      // and push `elem` to `source` here

      emit(List(source).iterator, ctx)
    }
  }
}

You can use the following snippet for test GroupBy flow (it should print events from created stream):

case class Tick()
case class Event(timestamp: Long, sessionUid: String, traffic: Int)

implicit val system = ActorSystem()
import system.dispatcher

implicit val materializer = ActorMaterializer()

var rnd = Random
rnd.setSeed(1)

val eventsSource = Source
  .tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick)
  .map {
    case _ => Event(System.currentTimeMillis / 1000, s"session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
  }

val flow = Flow[Event]
  .transform(() => new GroupByUntil)
  .map {
    (source) => source.runForeach(println)
  }

eventsSource
  .via(flow)
  .runWith(Sink.ignore)
  .onComplete(_ => system.shutdown())

Can anybody explain me how to do it?

UPDATE:

I wrote the following onPush method base on this answer but it didn't print events. As I understand I can push element to source only when it running as part of flow but I want to run flow outside of GroupBy in test snippet. If I run flow in GroupBy as in this example then it will process events and send them to Sink.ignore. I think this is a reason why my test snippet didn't print events.

override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
  val source: Source[A, ActorRef] = Source.actorRef[A](1000, OverflowStrategy.fail)
  val flow = Flow[A].to(Sink.ignore).runWith(source)

  flow ! elem

  emit(List(source.asInstanceOf[Source[A, Unit]]).iterator, ctx)
}

So, how to fix it?

Community
  • 1
  • 1
Maxim
  • 9,701
  • 5
  • 60
  • 108
  • [Here](https://groups.google.com/forum/#!topic/akka-user/LOv32nSSn0A) is answer to this question from Akka Team. – Maxim Nov 26 '15 at 12:57

0 Answers0