I have event stream as follows:
sealed trait Event
val eventStream: fs2.Stream[IO, Event] = //...
I want to group this events received within a single minute (i.e from 0 sec to 59 sec of every minute). This sounds pretty straightforward with fs2
val groupedEventsStream = eventStream groupAdjacentBy {event =>
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}
The problem is that the grouping function is not pure. It uses currentTimeMillis
. I can workaroud this as follows:
stream.evalMap(t => IO(System.currentTimeMillis(), t))
.groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))
The thing is that adds clumsy boilerplate with tuples I'd like to avoid. Is there any other solutions?
Or maybe using impure function is not that bad for such a case?