If we want to allow Stream
s that live in ResourceT
, we can do without the functions from streaming-wai (that only work for Stream
s based on IO
) and instead build on top of functions like responseStream
from network-wai:
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
has type StreamingBody
, which is actually a type synonym for a function (Builder -> IO ()) -> IO () -> IO ()
that takes a write callback and a flush callback as parameters, and uses them to write the response using some source of data that is in scope. (Note that these callbacks are provided by WAI, not by the user.)
In our case, the source of data is a Stream
that lives in ResourceT
. We need to lift the write and flush callbacks (that live in IO
) using liftIO
, an also remember to invoke runResourceT
to return a plain IO
action at the end.
What if we wanted to flush the response only after the accumulated length of the emitted bytestrings reached some limit?
We would need a function (not implemented here) to create a division each time the limit is reached:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
And then we could intercalate the flushing action between each group using intercalates
, before writing the stream:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken