1

I'm using the streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteStrings. I suspect something like toChunks would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming documentation is very unhelpful, mostly full of grandiose comparisons to alternative libraries.

Here's some code with my best effort so far. It doesn't include the counting yet, and just tries to print the size of chunks as they stream past (and doesn't compile).

download :: ByteString -> FilePath -> IO ()
download i file = do
  req <- parseRequest . C.unpack $ i
  m <- newHttpClientManager
  runResourceT $ do
    resp <- http req m
    lift . traceIO $ "downloading " <> file
    let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
    SBS.writeFile file body

step bs = do
  traceIO $ "got " <> show (C.length bs) <> " bytes"
  return bs
Matt Joiner
  • 112,946
  • 110
  • 377
  • 526

1 Answers1

3

What we want is to traverse the Stream (Of ByteString) IO () in two ways:

  • One that accumulates the incoming lengths of the ByteStrings and prints updates to console.
  • One that writes the stream to a file.

We can do that with the help of the copy function, which has type:

copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r

copy takes a stream and duplicates it into two different monadic layers, where each element of the original stream is emitted by both layers of the new dissociated stream.

(Notice that we are changing the base monad, not the functor. What changing the functor to another Stream does is to delimit groups in a single stream, and we aren't interested in that here.)

The following function takes a stream, copies it, accumulates the length of incoming strings with S.scan, prints them, and returns another stream that you can still work with, for example writing it to a file:

{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B

track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
      S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
    . S.scan (\s b -> s + B.length b) (0::Int) id
    $ S.copy stream

This will print the ByteStrings along with the accumulated lengths:

main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]
danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • 1
    That's a bit mindblowing. The outer stream originally returned by S.copy is going to drive the inner one? – Matt Joiner Mar 06 '18 at 07:38
  • @MattJoiner It's more as if the elements were emitted in lockstep. In fact you can use `hoist` to work with the inner `Stream` first: http://hackage.haskell.org/package/streaming-0.2.0.0/docs/Streaming.html#v:hoist – danidiaz Mar 06 '18 at 18:52
  • @MattJoiner, for whatever it's worth, `copy` is a special case of `expand :: (Monad m, Functor f) => (forall a b. (g a -> b) -> f a -> h b) -> Stream f m r -> Stream g (Stream h m) r`. Maybe that will help your intuition somehow. Certainly, its implementation is much more constrained by its type. – dfeuer Mar 07 '18 at 07:31
  • @danidiaz Is it possible to make it nonblocking? I would like to call `track` repeatedly, and display the download progress on screen at 60FPS (with an `(a -> IO a) -> IO ()`function [example](https://hackage.haskell.org/package/gloss-1.13.2.1/docs/Graphics-Gloss-Interface-IO-Game.html#v:playIO)). – Ford O. May 31 '21 at 14:41