2

I have a value body :: BS.ByteString (ResourceT IO) (), from a function based on BS.readFile. I want to stream that value as the response body from a Wai Application. There's a helper, streamingResponse that takes a value of the type Stream (Of ByteString) IO r. I'm able to convert my BS.ByteString (ResourceT IO) () to Stream (Of ByteString) (ResourceT IO) () through the use of BS.toChunks, but it contains an extra ResourceT monad layer. Passing the body to streamingResponse gives me:

Couldn't match type ‘ResourceT IO’ with ‘IO’
  Expected type: Stream (Of ByteString) IO ()
    Actual type: Stream (Of ByteString) (ResourceT IO) ()

I've tried various things like wrapping things in runResourceT, binding and hoisting values etc. but really have no idea how to proceed. Here's the line in the full project if extra context is required.

Update0

hoist runResourceT body seems to type check. Someone also referred me to a Haskell Pipes thread, which may be a very related problem, and possible hint toward a solution.

Matt Joiner
  • 112,946
  • 110
  • 377
  • 526
  • 1
    `hoist runResourceT body` typechecks, but I don't think it will actually do what you want it to do. I suspect it will every "step" of the stream as an isolated resource context, with is not the desired behaviour. – danidiaz May 03 '18 at 20:41
  • 2
    Agreed. Pretty sure `runResourceT $ m >>= f` is not equivalent to `runResourceT m >>= runResourceT . f`, which is the property you need to apply `hoist`. – Jack Kelly May 03 '18 at 20:57
  • Yes I can confirm `hoist runResourceT body` does not work the way I intend. – Matt Joiner May 04 '18 at 02:22

2 Answers2

2

If we want to allow Streams that live in ResourceT, we can do without the functions from streaming-wai (that only work for Streams based on IO) and instead build on top of functions like responseStream from network-wai:

import           Control.Monad.Trans.Resource
import           Network.Wai                     
import           Streaming                   
import qualified Streaming.Prelude               as S
import           Data.ByteString.Builder (byteString, Builder)

streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
                   -> Status
                   -> ResponseHeaders
                   -> Response
streamingResponseR stream status headers =
    responseStream status headers streamingBody
    where
    streamingBody writeBuilder flush =
        let writer a =
                do liftIO (writeBuilder (byteString a))
                    -- flushes for every produced bytestring, perhaps not optimal
                   liftIO flush
         in runResourceT $ void $ S.effects $ S.for stream writer

streamingBody has type StreamingBody, which is actually a type synonym for a function (Builder -> IO ()) -> IO () -> IO () that takes a write callback and a flush callback as parameters, and uses them to write the response using some source of data that is in scope. (Note that these callbacks are provided by WAI, not by the user.)

In our case, the source of data is a Stream that lives in ResourceT. We need to lift the write and flush callbacks (that live in IO) using liftIO, an also remember to invoke runResourceT to return a plain IO action at the end.


What if we wanted to flush the response only after the accumulated length of the emitted bytestrings reached some limit?

We would need a function (not implemented here) to create a division each time the limit is reached:

breaks' :: Monad m 
        => Int 
        -> Stream (Of ByteString) m r 
        -> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined

And then we could intercalate the flushing action between each group using intercalates, before writing the stream:

streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) () 
                  -> Int 
                  -> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
    let writer a = liftIO (writeBuilder (byteString a))
        flusher = liftIO flush
        broken = breaks' breakSize stream
     in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken
danidiaz
  • 26,936
  • 4
  • 45
  • 95
1

Instead of readFile, would withFile + hSetBinaryMode +Data.ByteString.Streaming.fromHandle be enough?

fromHandle produces a ByteString IO () that can be converted to a Stream (Of ByteString) IO () that streamingResponse or streamingBody can accept.

There's the issue of where to put the withFile bracketing operation. According to the WAI documentation, you can wrap with it the result of your Application-building function:

Note that, since WAI 3.0, this type is structured in continuation passing style to allow for proper safe resource handling. This was handled in the past via other means (e.g., ResourceT).

[...]

In order to allocate resources in an exception-safe manner, you can use the bracket pattern outside of the call to responseStream.


Note: the documentation of streaming-bytestring says that fromHandle closes the handle automatically when EOF is reached. Looking at the implementation, that doesn't seem to be the case. You need withFile to properly close the handle.

danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • Related to the topic of resource handling in WAI: https://stackoverflow.com/questions/47517973/what-forces-drove-wai-application-to-be-redesigned-five-times – danidiaz May 03 '18 at 21:18
  • Hmm yeah, that would work, but I'm trying to maintain the body to be written behind an interface, I don't want any Handle type stuff leaking back across. Perhaps I could restructure the interface to return an action to allow embedding it in the bracket operation. [Here](https://github.com/anacrolix/transcoder/blob/e38d7b573a43e01b69d7618cbec3782792079b0e/Main.hs#L436)'s the interface for reference. – Matt Joiner May 04 '18 at 02:26
  • @MattJoiner Perhaps you could structure your interface in continuation-passing style, something like `OpId -> FileLength -> (BS.ByteString (ResourceT IO) () -> IO r) -> IO r`. That would hide the gory details of handle management from the user while still letting you employ bracketing operations. Packages like "managed" http://hackage.haskell.org/package/managed are built around this pattern. The alternative type `OpId -> FileLength -> (BS.ByteString (ResourceT IO) x -> IO (r,x) -> IO (r,x)` would force the user to exhaust the stream when consuming it. – danidiaz May 04 '18 at 05:21