I have a conduit pipeline processing a long file. I want to print a progress report for the user every 1000 records, so I've written this:
-- | Every n records, perform the IO action.
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = skipN n 1
where
skipN c t = do
mv <- await
case mv of
Nothing -> return ()
Just v ->
if c <= 1
then do
liftIO $ act t v
yield v
skipN n (succ t)
else do
yield v
skipN (pred c) (succ t)
No matter what action I call this with, it leaks memory, even if I just tell it to print a full stop.
As far as I can see the function is tail recursive and both counters are regularly forced (I tried putting "seq c" and "seq t" in, to no avail). Any clue?
If I put in an "awaitForever" that prints a report for every record then it works fine.
Update 1: This occurs only when compiled with -O2. Profiling indicates that the leaking memory is allocated in the recursive "skipN" function and being retained by "SYSTEM" (whatever that means).
Update 2: I've managed to cure it, at least in the context of my current program. I've replaced the function above with this. Note that "proc" is of type "Int -> Int -> Maybe i -> m ()": to use it you call "await" and pass it the result. For some reason swapping over the "await" and "yield" solved the problem. So now it awaits the next input before yielding the previous result.
-- | Every n records, perform the monadic action.
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = await >>= proc 1 n
where
proc c t = seq c $ seq t $ maybe (return ()) $ \v ->
if c <= 1
then {-# SCC "progress.then" #-} do
liftIO $ act t v
v1 <- await
yield v
proc n (succ t) v1
else {-# SCC "progress.else" #-} do
v1 <- await
yield v
proc (pred c) (succ t) v1
So if you have a memory leak in a Conduit, try swapping the yield and await actions.