I have a stream of unordered measurements
, that I'd like to group into batches of a fixed size, so that I can persist them efficiently later:
val measurements = for {
id <- Seq("foo", "bar", "baz")
value <- 1 to 5
} yield (id, value)
fs2.Stream.emits(scala.util.Random.shuffle(measurements)).toVector
That is, instead of:
(bar,4)
(foo,5)
(baz,3)
(baz,5)
(baz,4)
(foo,2)
(bar,2)
(foo,4)
(baz,1)
(foo,1)
(foo,3)
(bar,1)
(bar,5)
(bar,3)
(baz,2)
I'd like to have the following structure for a batch size equal to 3
:
(bar,[4,2,1])
(foo,[5,2,4])
(baz,[3,5,4])
(baz,[1,2])
(foo,[1,3])
(bar,[5,3])
Is there a simple, idiomatic way to achieve this in FS2? I know there's a groupAdjacentBy function, but this will take into account neighbouring items only.
I'm on 0.10.5
at the moment.