If your ultimate goal is "answering queries about the last N events" then you can write a scaning Flow using a circular buffer:
import scala.collection.immutable
type CircularBuffer[T] = immutable.Vector[T]
def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T]
def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T] =
if(maxSize > 0)
buffer.drop(buffer.size - maxSize + 1) :+ item
else
buffer
import akka.stream.scaladsl.Flow
def circularBufferFlow[T](N : Int) =
Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N))
This Flow will hold between 0 and N elements, and emits a new Buffer with each update:
Source(1 to 10).via(circularBufferFlow[Int](3))
.runWith(Sink.foreach(println))
//Vector()
//Vector(1)
//Vector(1, 2)
//Vector(1, 2, 3)
//Vector(2, 3, 4)
//Vector(3, 4, 5)
//...
To get the very last buffer exclusively you can use Sink.last
to materialize the entire flow into a Future
.
If, however, your goal is truly to reverse a Stream then I would consider this unwise since a Stream could potentially go on for infinite which would result in your cache eventually consuming all of the JVM's memory and crashing the application.