0

Here a simplification of my code:

import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming

main :: IO ()
main = do
  res <- runResourceT $ calculateA myLinesStream
  return ()

type MyLinesStream m r = S.Stream (S.Of String) m r

connect :: IO Connection
connect = undefined

close :: Connection -> IO ()
close = undefined

calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
  where
    go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
    go stream conn = stream & S.length_ >>= liftIO . print

myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
  S.each ["1.zip", "2.zip"]
    & S.mapM (\fileName -> C.readFile fileName & gunzip)
    & S.mconcat
    & C.lines
    & mapsM (S.toList . C.unpack)
    & void

There is a type error on the following line on the go stream:

calculateA stream = liftIO (bracket connect close (go stream))

The error says:

Couldn't match type ‘m’ with ‘IO’
  ‘m’ is a rigid type variable bound by
    the type signature for:
      calculateA :: forall (m :: * -> *) r.
                    MonadIO m =>
                    MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
    Actual type: Connection -> m ()

Questions

  1. What to do to make this code typecheck and still make it secure for releasing resources in the calculateA function?
  2. I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?
  3. Is the composition good? (Note that I need the calculateA function separately from the myLinesStream)
xbalaj
  • 977
  • 1
  • 8
  • 14

1 Answers1

2

The problem is that you are trying to use bracket with a monad that is too general. bracket has signature:

bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c   

It takes IO actions as parameters. However, the go function that you pass to bracket needs to work in a generic base monad m chosen by the caller of calculateA (you later make that monad be ResourceT IO in main).

The bracket from base and ResourceT don't mix very well. Instead, you need to turn to special functions from the resourcet package like allocate and release, and use them to define a helper like:

bracketStream :: (Functor f, MonadResource m) 
              => IO a 
              -> (a -> IO ()) 
              -> (a -> Stream f m b) 
              -> Stream f m b
bracketStream alloc free inside = do
        (key, seed) <- lift (allocate alloc free)
        r <- inside seed
        release key
        pure r

How does it work? If you have a stream of Xs, it prepends an allocation action at the beginning of the stream (registering the corresponding cleanup action to be called in case of abnormal termination, like exceptions) and it also adds an explicit call to the cleanup action when the stream is exhausted:

(allocate+register cleanup) X X X ... X (cleanup)

You wrote:

I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?

Yes. With ResourceT, resources are freed either when an explicit cleanup action is performed, or when when we "exit" the ResourceT with runResourceT (perhaps abnormally, with an exception).

So, if we read a stream of Xs followed by a stream of Ys, we would have:

(allocate+register cleanup) X X X ... X (cleanup) (allocate+register cleanup) Y Y Y ... Y (cleanup)

That is, the resource that produces Xs would be released before allocating the resource that produces Ys.

danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • Thank you very much for your answer once again!! :) I was aware of the problem that the monad was too general, but didn't know what to do with that. Thanks, for your insights and additional explanations. Really appreciated. – xbalaj Feb 11 '22 at 21:06
  • 1
    @xbalaj Incidentally: the `Stream` type had something like `bracketStream` (as a `MonadCatch` instance providing a generalized `bracket`) until 0.1.4.5 https://hackage.haskell.org/package/streaming-0.1.4.5 so you didn't have to define it yourself. "Remove bracketStream, MonadCatch instance, and everything dealing with ResourceT." https://hackage.haskell.org/package/streaming-0.2.3.1/changelog Some (myself among them) were uneasy with those functions because they had some unexpected interactions with `Stream.Prelude.take`. But perhaps they should be added back, with the appropriate warning. – danidiaz Feb 12 '22 at 08:34
  • 1
    The problem with combining `bracketStream` with `Stream.Prelude.take` is that if you bracket a `Stream` and then `take` from it, the resource release action that `bracketStream` adds at the *end* of the `Stream` is forgotten. Which means the resource will only be released during the general cleanup that happens when we leave the `ResourceT` monad. Is that bad? Usually not, but imagine that you wanted to define a stream composed of the first 10 lines taken from each file in a directory of 10000 files. You might run out of file descriptors before exiting `ResourceT`! – danidiaz Feb 12 '22 at 08:42