What we need is a function that inserts a single delay at the end of a chunk stream, and pass that function to maps
.
delay
doesn't work here because it put delays between each yielded value. But we can do it easily using functions from Applicative
:
S.print
$ concats
$ S.maps (\s -> s <* liftIO (threadDelay 1000000))
$ chunksOf 2
$ S.each [1..10]
What is happening here? maps
applies a transformation to the "base functor" of the Stream
. In a "chunked stream" obtained with chunksOf
, that base functor is itself a Stream
. Also, the transformation must preserve the return value of the Stream
.
Stream
s can be sequenced with functions like (>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b
if the next stream depends on the final result of the previous one, or with functions like (<*) :: Stream f m a -> Stream f m b -> Stream f m a
if it doesn't. (<*)
preserves the return value of the first Stream
, which is what we want in this case.
We do not want to yield any more elements, but only to introduce a delay effect, so we simply liftIO
the effect into the Stream
monad.
Another way to insert delays after each yielded value of a Stream
is to zip it with an infinite list of delays:
delay' :: MonadIO m => Int -> Stream (Of a) m r -> Stream (Of a) m r
delay' micros s = S.zipWith const s (S.repeatM (liftIO (threadDelay micros)))