0

I currently have an event sourced service supported by Akka and Cassandra. This is a bidding system called AuctionService and at some point I need to retrieve the last bid event called BiddenOnLot. For that I use akka-persistence-query.

Here is my current code:

        // obtain read journal by plugin id
        val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](
          "cassandra-query-journal")

        // issue query to journal
        val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(self.path.name.toString, 0, Long.MaxValue)

        // materialize stream, consuming events
        implicit val mat = ActorMaterializer()

        source.runForeach(envelope ⇒ {
          if (envelope.event.isInstanceOf[BiddenOnLot]) {
            val biddenOnLot = envelope.event.asInstanceOf[BiddenOnLot]
            if (biddenOnLot.paddleId == paddleId) {
               // TODO: get last bid event by paddle id
            }
          }
        })

So far I loop through all events and can determine its type. But I'm really struggling to isolate the last bid event and be able to use it asynchronously. Any ideas?

Tíbó
  • 1,188
  • 13
  • 28

1 Answers1

0

You can do it by using pattern matching to deconstruct emitted EventEnvelope and map it the event you want:

source.map {
    case EventEnvelope(_, _, _, bidenOnLot: BidenOnLot) if bidenOnLot.paddleId == paddleId => bidenOnLot
}.runForeach(println)
Branislav Lazic
  • 14,388
  • 8
  • 60
  • 85