How can I make the following function tail recursive and fully streaming?
def aggregate[A, B](in: Stream[A], predicate: Stream[A] => B): Stream[B] = {
in match {
case Empty => Stream.empty
case head #:: Empty => predicate(in) #:: Stream.empty
case head #:: tail => predicate(in) #:: aggregate(tail, predicate)
}
}
The goal process the stream with some lookahead. As an example let's say we have a stream of sequential numbers: val s: Stream[Int] = Stream.from(0)
. Now I'd like to calculate the sum of 5 sequential elements of this stream beginning at every element. With the function above it would look like this: val r: Stream[Int] = aggregate(s, _.take(5).sum)
, producing the stream of 10, 15, 20...
. But this will break the stack on large input. How can I optimize this function?