0

newbie to Streaming and Haskell here.

I've been playing around with the streaming library and I'm particularly interested in understanding the chunks part. Eg:

S.print $ S.delay 1.0 $ concats $ chunksOf 2 $ S.each [1..10]

Or:

S.print $ concats $ S.maps (S.delay 1.0) $ chunksOf 2 $ S.each [1..10]

Here I can introduce a delay after each element but what I want is to have a delay after each chunk, in this case a delay every second element. I tried this but doesn't compile:

S.print $ concats $ S.delay 1.0 $ chunksOf 2 $ S.each [1..10]

How can I achieve this?

1 Answers1

1

What we need is a function that inserts a single delay at the end of a chunk stream, and pass that function to maps.

delay doesn't work here because it put delays between each yielded value. But we can do it easily using functions from Applicative:

  S.print 
$ concats 
$ S.maps (\s -> s <* liftIO (threadDelay 1000000)) 
$ chunksOf 2 
$ S.each [1..10]

What is happening here? maps applies a transformation to the "base functor" of the Stream. In a "chunked stream" obtained with chunksOf, that base functor is itself a Stream. Also, the transformation must preserve the return value of the Stream.

Streams can be sequenced with functions like (>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b if the next stream depends on the final result of the previous one, or with functions like (<*) :: Stream f m a -> Stream f m b -> Stream f m a if it doesn't. (<*) preserves the return value of the first Stream, which is what we want in this case.

We do not want to yield any more elements, but only to introduce a delay effect, so we simply liftIO the effect into the Stream monad.


Another way to insert delays after each yielded value of a Stream is to zip it with an infinite list of delays:

delay' :: MonadIO m => Int -> Stream (Of a) m r -> Stream (Of a) m r
delay' micros s = S.zipWith const s (S.repeatM (liftIO (threadDelay micros)))
danidiaz
  • 26,936
  • 4
  • 45
  • 95