7

I have event stream as follows:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...

I want to group this events received within a single minute (i.e from 0 sec to 59 sec of every minute). This sounds pretty straightforward with fs2

val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

The problem is that the grouping function is not pure. It uses currentTimeMillis. I can workaroud this as follows:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

The thing is that adds clumsy boilerplate with tuples I'd like to avoid. Is there any other solutions?

Or maybe using impure function is not that bad for such a case?

Some Name
  • 8,555
  • 5
  • 27
  • 77

1 Answers1

2

You could remove some of the boilerplate by using cats.effect.Clock:

def groupedEventsStream[A](stream: fs2.Stream[IO, A])
                          (implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
  stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
    .groupAdjacentBy(_._1)
mutantacule
  • 6,913
  • 1
  • 25
  • 39