5

Original

I'm trying to aggregate a CSV file and experiencing [what I consider to be] excessive memory usage and/or GC effort. The issue seems to arise when the number of groups increases. There is no problem when the keys are in the hundreds or thousands, but quickly starts spending a majority of time in the GC when the keys reach tens of thousands.

Update

Moving from Data.ByteString.Lazy.ByteString to Data.ByteString.Short.ShortByteString significantly reduced the memory consumption (to a level I think is reasonable). However, the amount of time spent in the GC still seems far higher than I would expect to be necessary. I moved from Data.HashMap.Strict.HashMap to Data.HashTable.ST.Basic.HashTable to see if the mutation in ST would help but it did not appear to. The following is the current full test code, including generateFile to create a test sample:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import System.IO (withFile, IOMode(WriteMode))
import qualified System.Random as Random

import qualified Data.ByteString.Short as BSS
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as MV
import qualified Control.Monad.ST as ST

import qualified Data.HashTable.ST.Basic as HT
import qualified Data.HashTable.Class as HT (toList)
import Data.Hashable (Hashable, hashWithSalt)

import Data.List (unfoldr)

import qualified Data.Traversable as T
import Control.Monad (forM_)

instance Hashable a => Hashable (V.Vector a) where
  hashWithSalt s = hashWithSalt s . V.toList

data CSVFormat = CSVFormat {
  csvSeparator :: Char,
  csvWrapper :: Char
}

readCSV :: CSVFormat -> Int -> FilePath -> IO [V.Vector BSS.ShortByteString]
readCSV format skip filepath = BL.readFile filepath >>= return . parseCSV format skip

parseCSV :: CSVFormat -> Int -> BL.ByteString -> [V.Vector BSS.ShortByteString]
parseCSV (CSVFormat sep wrp) skp = drop skp . unfoldr (\bs -> if BL.null bs then Nothing else Just (apfst V.fromList (parseLine bs)))
  where
    {-# INLINE apfst #-}
    apfst f (x,y) = (f x,y)

    {-# INLINE isCr #-}
    isCr c = c == '\r'

    {-# INLINE isLf #-}
    isLf c = c == '\n'

    {-# INLINE isSep #-}
    isSep c = c == sep || isLf c || isCr c

    {-# INLINE isWrp #-}
    isWrp c = c == wrp

    {-# INLINE parseLine #-}
    parseLine :: BL.ByteString -> ([BSS.ShortByteString], BL.ByteString)
    parseLine bs =
      let (field,bs') = parseField bs in
      case BL.uncons bs' of
        Just (c,bs1)
          | isLf c -> (field : [],bs1)
          | isCr c ->
              case BL.uncons bs1 of
                Just (c,bs2) | isLf c -> (field : [],bs2)
                _ -> (field : [],bs1)
          | otherwise -> apfst (field :) (parseLine bs1)
        Nothing -> (field : [],BL.empty)

    {-# INLINE parseField #-}
    parseField :: BL.ByteString -> (BSS.ShortByteString, BL.ByteString)
    parseField bs =
      case BL.uncons bs of
        Just (c,bs')
          | isWrp c -> apfst (BSS.toShort . BL.toStrict . BL.concat) (parseEscaped bs')
          | otherwise -> apfst (BSS.toShort . BL.toStrict) (BL.break isSep bs)
        Nothing -> (BSS.empty,BL.empty)

    {-# INLINE parseEscaped #-}
    parseEscaped :: BL.ByteString -> ([BL.ByteString], BL.ByteString)
    parseEscaped bs =
      let (chunk,bs') = BL.break isWrp bs in
      case BL.uncons bs' of
        Just (_,bs1) ->
          case BL.uncons bs1 of
            Just (c,bs2)
              | isWrp c -> apfst (\xs -> chunk : BL.singleton wrp : xs) (parseEscaped bs2)
              | otherwise -> (chunk : [],bs1)
            Nothing -> (chunk : [],BL.empty)
        Nothing -> error "EOF within quoted string"

aggregate :: [Int]
          -> Int
          -> [V.Vector BSS.ShortByteString]
          -> [V.Vector BSS.ShortByteString]
aggregate groups size records =
  let indices = [0..size - 1] in

  ST.runST $ do
    state <- HT.new

    forM_ records (\record -> do
        let key = V.fromList (map (\g -> record V.! g) groups)

        existing <- HT.lookup state key
        case existing of
          Just x ->
            forM_ indices (\i -> do
                current <- MV.read x i
                MV.write x i $! const current (record V.! i)
              )
          Nothing -> do
            x <- MV.new size
            forM_ indices (\i -> MV.write x i $! record V.! i)
            HT.insert state key x
      )

    HT.toList state >>= T.traverse V.unsafeFreeze . map snd

filedata :: IO ([Int],Int,[V.Vector BSS.ShortByteString])
filedata = do
  records <- readCSV (CSVFormat ',' '"') 1 "file.csv"
  return ([0,1,2],18,records)

main :: IO ()
main = do
  (key,len,records) <- filedata
  print (length (aggregate key len records))

generateFile :: IO ()
generateFile = do
  withFile "file.csv" WriteMode $ \handle -> do
    forM_ [0..650000] $ \_ -> do
      x <- BL.pack . show . truncate . (* 15 ) <$> (Random.randomIO :: IO Double)
      y <- BL.pack . show . truncate . (* 50 ) <$> (Random.randomIO :: IO Double)
      z <- BL.pack . show . truncate . (* 200) <$> (Random.randomIO :: IO Double)
      BL.hPut handle (BL.intercalate "," (x:y:z:replicate 15 (BL.replicate 20 ' ')))
      BL.hPut handle "\n"

I receive the following profiling result:

17,525,392,208 bytes allocated in the heap
27,394,021,360 bytes copied during GC
   285,382,192 bytes maximum residency (129 sample(s))
     3,714,296 bytes maximum slop
           831 MB total memory in use (0 MB lost due to fragmentation)

                                   Tot time (elapsed)  Avg pause  Max pause
Gen  0       577 colls,     0 par    1.576s   1.500s     0.0026s    0.0179s
Gen  1       129 colls,     0 par   25.335s  25.663s     0.1989s    0.2889s

TASKS: 3 (1 bound, 2 peak workers (2 total), using -N1)

SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

INIT    time    0.000s  (  0.002s elapsed)
MUT     time   11.965s  ( 23.939s elapsed)
GC      time   15.148s  ( 15.400s elapsed)
RP      time    0.000s  (  0.000s elapsed)
PROF    time   11.762s  ( 11.763s elapsed)
EXIT    time    0.000s  (  0.088s elapsed)
Total   time   38.922s  ( 39.429s elapsed)

Alloc rate    1,464,687,582 bytes per MUT second

Productivity  30.9% of total user, 30.5% of total elapsed

gc_alloc_block_sync: 0
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 0

And the following heap visualization: Heap visualization

ryachza
  • 4,460
  • 18
  • 28
  • how would these profiles compare if you use _strict_ ST monad `Control.Monad.ST.Strict`? – behzad.nouri Dec 24 '15 at 19:29
  • @behzad.nouri Thanks for the idea. Unfortunately it made exactly no difference in the heap or GC usage. Do you think there is something else that could be too lazy? I think there's something related to either what the keys are or how many there are, or it only becomes visible with certain counts or file sizes. I used to have a space leak on all files, but adding `$!` fixed that, now it's just this file and only really noticeable for large numbers of keys. – ryachza Dec 24 '15 at 19:34
  • All the update sections make this confusing to read. Could you try to put this question together as one cohesive unit? Also, please include *all* relevant code (including the actual CSV parsing code if you wrote it or a link to it otherwise) and a link to the test file. We need to be able to actually *run* your code, test it, fiddle with it, etc., if we're to have much hope of solving the problem. Without full information, I will vote to close. – dfeuer Jan 02 '16 at 17:50
  • @dfeuer I removed the original. I believe the block of code is complete and runnable. – ryachza Jan 02 '16 at 18:40

1 Answers1

0

This turned out to be the V.! calls not being strict enough. Replacing them with indexM hugely reduced the memory consumption.

ryachza
  • 4,460
  • 18
  • 28