1

In haskell streaming, there is an example of copy

>>> (S.toList . mapped S.toList . chunksOf 5) $  (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())

Is it possible to separate this into two "clean" streams such that it can print below results?

>>>S.print stream1
[[1,2,3,4,5],[6,7,8,9,10]]
>>>S.print stream2
[[1,2,3],[4,5,6],[7,8,9],[10]]

Note there is no more ':>' in the above result. More generally, I am not sure whether there are functions that 'simplify' the nested streams (or stream of streams) from m or (Of a) part in Stream (Of a) m r

f1::Stream (Of a) (Stream (Of b) m) r -> Stream (Of b) m r
f2::Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) m r
f3::Stream (Stream (Of a) m) r -> Stream (Of a) m r

[Update]

The background of this question is I am looking for idiomatic ways to re-use an underlying stream multiple times. The stream is pulled from database and the IO can be expensive. I also want to get a reference to the intermediate streams so that I can better structure my code. Some mock code:

my-stream-fn = do
  original_stream <- pull_from_database 
  let  (o1, s1) = calc_moving_average $ S.copy original_stream
       (o2, s2) = calc_max $ S.copy o1
       (o3, s3) = calc_min $ S.copy o2
  S.print $ S.zipWith3 (\x y z-> (x, y, z)) s1 s2 s3

What I wish is o1 o2 and o3 being exactly the same as original_stream and pull_from_database IO operation done only once when original_stream is pulled.

Kevin Zhu
  • 2,746
  • 26
  • 23
  • If you want to use it multiple times, then you don't want it to be a `Stream`. A `Stream` is essentially a *control structure* that can be manipulated *somewhat* like a data structure. But that's really more by analogy than anything else. – dfeuer Feb 19 '20 at 02:32
  • If you have a `Stream (Of x) IO y`, you can think of that as a sort of button. When you push the button, either you'll get an `x` and the button will be available to push again or you'll get a `y` and the button will deactivate itself. – dfeuer Feb 19 '20 at 02:35
  • @dfeuer thanks for your comment. Looks to me S.copy function itself shows that a stream can be used multiple times. In the documentation http://hackage.haskell.org/package/streaming-0.2.3.0/docs/Streaming-Prelude.html it says "Thus I can independently filter and write to one file, but nub and write to another, or interact with a database and a logfile and the like". I am looking for ways to get a reference to these intermediate streams so that i can manipulate the streams further in other functions. – Kevin Zhu Feb 19 '20 at 02:36
  • 1
    It can be "used multiple times", but only in an interleaved fashion. – dfeuer Feb 20 '20 at 03:28

1 Answers1

2
f1 = S.effects @(Stream (Of _) _) 
  :: Monad m 
  => Stream (Of a) (Stream (Of b) m) r 
  -> Stream (Of b) r
f2 = hoist @(Stream (Of _)) S.effects
  :: Monad m
  => Stream (Of a) (Stream (Of b) m) r
  -> Stream (Of a) m r

(typevars renamed for clarity, see docs for effects), and f3 doesn't kind-check.

It feels like you're trying to defeat the point of streaming. You construct the pipeline, source to sink, and run it - the key is that there's no (implicit) intermediate accumulation of values. Your question is slightly loose and thus different to answer precisely, but if you wish to run all the effects of the first stream, then all the effects of the second stream, then you must be willing to store the (computation representing the) second stream until the first stream has finished effecting => you've accumulated the second stream (and thus not really streamed it). Hence why S.copy is designed for interleaving the effects. Cf. this github issue.

[Response to Update]

I think part of what's confusing you is that you're using pure streams, and in the absence of effects then the restrictions are less obviously-motivated. Use identifiers for the components of the pipeline, not the partial results. Also in your example, then you should be combining folds, eg.

import qualified Control.Foldl as L
import qualified Streaming.Prelude as S

myStreamFn =
  let movingAvg n = {-# ... #-}
      combinedAcc = (,,) <$> L.minimum <*> L.maximum <*> movingAvg 10
  in  S.print 
   $  L.purely S.fold combinedAcc 
   $  pullFromDatabase

Another function you might like to consider is S.store, eg.

myStreamFn 
  = pullFromDatabase
  & S.store S.maximum
  & S.store (L.purely S.fold L.minimum)
  & S.store movingAvg
  & S.print
moonGoose
  • 1,510
  • 6
  • 14
  • Thank you for your answer. I have updated my question to make it clearer. I am actually looking for 'references' to the intermediate stream and original stream so that I can use original stream multiple times. – Kevin Zhu Feb 19 '20 at 01:49
  • 1
    I have updated my answer too. The key is that you should not be looking for references to the 'intermediate stream' - because it doesn't exist anywhere. What exists is the capability to get the next element (or end stream). To use the original stream multiple times, you have to split that capability between consumers. Use identifiers for the consumers instead. – moonGoose Feb 19 '20 at 19:55