3

A simplified (non-effectful) definition of Stream from the streaming library reads like this:

data Stream f = Step (f (Stream f))
              | Return

I'm trying to understand what the motivation for introducing this functor f is.

A typical such f is Of a, with Of defined as

data Of a b = !a :> b
    deriving Functor

When I read f as Of a in the definition of Stream this sort of makes sense, Of a b is something like an a followed by more that is obtainable from b. With this interpretation, the f (Stream f) in the definition of Stream reads something like Of a (Stream (Of a)). But in this case a simpler version would have been

data Stream a = Step a (Stream a)
              | Return

I struggle to understand why the generalization with this functor f is used. In the introduction the author says that Stream

... can be used to stream successive distinct steps characterized by any functor f

and

the general type Stream f m r expresses a succession of steps ... with a shape determined by the 'functor' parameter f.

But in Streaming.Prelude the only functors I can find are Or a, Stream (Of a) m and Identity. The first one makes a stream of as, the second a stream of streams, the third achieves 'erasure'.

I really don't get it. I can achieve all of these things, i.e. simple stream of as, stream of streams, and erasure, without using this f in Stream.

What does this functor f do that can't be done otherwise?

mcmayer
  • 1,931
  • 12
  • 22

1 Answers1

1

A key feature of streaming's grouping functions is that they don't force you to keep entire groups in memory at any point. They perform grouping in an "streaming" fashion. The moment the start of a new group is detected, you can begin processing the group "downstream".

For example, imagine a function like lines :: Stream (Of Text) IO r -> Stream (Stream (Of Text) IO) IO r. That function will start a new group whenever it detects a newline in the stream, even if the end of the new line hasn't materialized yet.

The type Stream (Stream (OfText) IO) IO r has the advantage that, to reach the line that comes after the current one, we must consume the current line completely. This is because the "rest of the stream" lies in the result value of the functor parameter. And Streams only give up their results once they have been exhausted.

A type like Stream (Of (Stream (Of a) IO ()) IO () would not force us to exhaust the current line before moving to the next one. We could simply ignore the inner Stream that was yielded advance the outer Stream. But that "next line" might not even exist yet, as we haven't actually read what came before! Perhaps this scheme could be made to work somehow, but the semantics would be less clear and require more knowledge from the user.


Functions like mapped let us consume "downstream" groups which are defined "upstream". For example, this code reads strings form stdin, where the string "-" is a group separator. The downstream code parses the strings into Ints and shows a message whenever a 5 is encountered. Notice that messages are shown before the group is closed upstream.

    import Streaming
    import qualified Streaming.Prelude as S

    example :: Stream (Of ()) IO () 
    example = 
        let upstream :: Stream (Stream (Of String) IO) IO ()
            upstream = S.breaks (=="-") S.stdinLn
            downstream :: Stream (Stream (Of String) IO) IO () 
                       -> Stream (Of ()) IO ()
            downstream = S.mapped handleGroup
            handleGroup :: Stream (Of String) IO r ->
                           IO (Of () r)
            handleGroup stream = do
                print "group began" 
                r <- S.effects
                   . S.chain (\_ -> putStrLn "found!") 
                   . S.filter (==5) 
                   . S.map (read @Int) 
                   $ stream 
                print "group ended"
                return (() :> r)
         in downstream upstream

    main :: IO ()
    main = S.effects example
danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • Also, note that the `separate` http://hackage.haskell.org/package/streaming-0.2.3.0/docs/Streaming-Prelude.html#v:separate and `unseparate` functions from `Streaming.Prelude` use the `Sum` functor. – danidiaz Nov 26 '19 at 08:08
  • What I don't get is why I can't repeat that argument with `Of a` replaced by just `a`. – mcmayer Nov 26 '19 at 08:46
  • @mcmayer In the simple non-grouped case, there's no problem in going directly to the rest of the stream, because yielded values already have been "materialized" so to speak. There's nothing left to read in order to advance. But with groups, we actually perform effects when going through the group. The inner stream lives in `IO` after all! We can't simply jump to the next group directly. Streaming libraries that fully materialize groups before sending them downstream don't have this distinction. – danidiaz Nov 26 '19 at 08:53
  • @mcmayer "repeat that argument with Of a replaced by just a". If you remove `Of` and bake the yielded element into the `Step` constructor, then you need to define different `Stream` types for the grouped and ungrouped case, and no functions can be reused between the two. There are packages like "streaming-bytestring" http://hackage.haskell.org/package/streaming-bytestring that actually do this for performance reasons, as it reduces indirections. – danidiaz Nov 26 '19 at 13:25