0

I have a PersistentActor that persists events and I would like to read them in reverse order. More specifically, I would like to get a Source for the events that emits the events in reversed order. This is useful for answering queries about the last N events that happened for that PersistentActor. What is the right approach for this problem? This information should be kept cached in the view or there is a way that I can lazily query about the events in reverse order?

Thanks!

ale64bit
  • 6,232
  • 3
  • 24
  • 44

1 Answers1

1

If your ultimate goal is "answering queries about the last N events" then you can write a scaning Flow using a circular buffer:

import scala.collection.immutable

type CircularBuffer[T] = immutable.Vector[T]

def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T]

def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T]  =
  if(maxSize > 0)
    buffer.drop(buffer.size - maxSize + 1) :+ item
  else 
    buffer

import akka.stream.scaladsl.Flow

def circularBufferFlow[T](N : Int) = 
  Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N))

This Flow will hold between 0 and N elements, and emits a new Buffer with each update:

Source(1 to 10).via(circularBufferFlow[Int](3))
               .runWith(Sink.foreach(println))

//Vector()
//Vector(1)
//Vector(1, 2)
//Vector(1, 2, 3)
//Vector(2, 3, 4)
//Vector(3, 4, 5)
//...

To get the very last buffer exclusively you can use Sink.last to materialize the entire flow into a Future.

If, however, your goal is truly to reverse a Stream then I would consider this unwise since a Stream could potentially go on for infinite which would result in your cache eventually consuming all of the JVM's memory and crashing the application.

Community
  • 1
  • 1
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125