5

While writing a deserialiser for a large (<bloblength><blob>)* encoded binary file I got stuck with the various Haskell produce-transform-consume libraries. So far I'm aware of four streaming libraries:

Here's a stripped down example of where things go wrong when I try to do Word32 streaming with conduit. A slightly more realistic example would first read a Word32 that determines the blob length and then yield a lazy ByteString of that length (which is then deserialised further). But here I just try to extract Word32's in streaming fashion from a binary file:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes

The output of the program is just the number of Word32's that were read. It turns out the stream terminates after reading the first chunk (about 32KiB). For some reason mbs is never Nothing, so I must check null bs which stops the stream when the chunk is consumed. Clearly, my conduit transform is faulty. I see two routes to a solution:

  1. The await doesn't want to go to the second chunk of the ByteStream, so is there another function that pulls the next chunk? In examples I've seen (e.g. Conduit 101) this is not how it's done
  2. This is just the wrong way to set up transform.

How is this done properly? Is this the right way to go? (Performance does matter.)

Update: Here's a BAD way to do it using Systems.IO.Streams:

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r

'Bad' means: Very demanding in time and space, does not handle Decode exception.

mcmayer
  • 1,931
  • 12
  • 22

3 Answers3

3

Your immediate problem is caused by how you are using leftover. That function is used to "Provide a single piece of leftover input to be consumed by the next component in the current monadic binding", and so when you give it bs before looping with transform you are effectively throwing away the rest of the bytestring (i.e. what is after bs).

A correct solution based on your code would use the incremental input interface of Data.Binary.Get to replace your yield/leftover combination with something that consumes each chunk fully. A more pragmatic approach, though, is using the binary-conduit package, which provides that in the shape of conduitGet (its source gives a good idea of what a "manual" implementation would look like):

import           Data.Conduit.Serialization.Binary

-- etc.

transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be

One caveat is that this will throw a parse error if the total number of bytes is not a multiple of 4 (i.e. the last Word32 is incomplete). In the unlikely case of that not being what you want, a lazy way out would be simply using \bs -> C.take (4 * truncate (C.length bs / 4)) bs on the input bytestring.

duplode
  • 33,731
  • 7
  • 79
  • 150
  • Oh I see, "... in the current monadic binding", that explains it. Thank you. I plugged in the `transform` as you suggest and it works, but it consumes vast amounts of memory (~500MB memory for a 35MB file). It seems the laziness is lost when using `Data.Conduit.List`. – mcmayer Oct 09 '16 at 07:22
  • @mcmayer That's pretty much it. The rule of thumb is that when using streaming libraries you shouldn't collect your output in a list (as with `outputToList` in the bad *io-streams* example you added in the edit), as you really don't want that to happen if there are lots of output. Rather, you should use the appropriate streaming consumer (or "sink", in the *conduit* terminology). Michael's answer shows the gist of doing that with *pipes* (the consumer being `P.print` in his demo); a *conduit* solution would be analogous. – duplode Oct 09 '16 at 11:50
3

With pipes (and pipes-group and pipes-bytestring) the demo problem reduces to combinators. First we resolve the incoming undifferentiated byte stream into little 4 byte chunks:

chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n) 

then we map these to Word32s and (here) count them.

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
     print n

This will fail if we have less than 4 bytes or otherwise fail to parse but we can as well map with

getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case  G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
  Left r -> Nothing
  Right (_, off, w32) -> Just w32

The following program will then print the parses for the valid 4 byte sequences

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ chunksOfStrict 4 (Bytes.fromHandle h) 
                 >-> P.map getMaybeWord32
                 >-> P.concat  -- here `concat` eliminates maybes
                 >-> P.print 

There are other ways of dealing with failed parses, of course.

Here, though, is something closer to the program you asked for. It takes a four byte segment from a byte stream (Producer ByteString m r) and reads it as a Word32 if it is long enough; it then takes that many of the incoming bytes and accumulates them into a lazy bytestring, yielding it. It just repeats this until it runs out of bytes. In main below, I print each yielded lazy bytestring that is produced:

module Main (main) where 
import Pipes 
import qualified Pipes.Prelude as P
import Pipes.Group (folds) 
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL 
import System.Environment ( getArgs )

splitLazy :: (Monad m, Integral n) =>
   n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
  (bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
  return (BL.fromChunks bss, rest)

measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
 (lbs, rest) <- lift $ splitLazy 4 bs
 if BL.length lbs /= 4
   then rest >-> P.drain -- in fact it will be empty
   else do
     let w32 = G.runGet G.getWord32be lbs
     (lbs', rest') <- lift $ splitLazy w32 bs
     yield lbs
     measureChunks rest

main :: IO ()
main = do
  filename:_ <- getArgs
  IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print

This is again crude in that it uses runGet not runGetOrFail, but this is easily repaired. The pipes standard procedure would be to stop the stream transformation on a failed parse and return the unparsed bytestream.

If you were anticipating that the Word32s were for large numbers, so that you did not want to accumulate the corresponding stream of bytes as a lazy bytestring, but say write them to different files without accumulating, we could change the program pretty easily to do that. This would require a sophisticated use of conduit but is the preferred approach with pipes and streaming.

Michael
  • 2,889
  • 17
  • 16
1

Here's a relatively straightforward solution that I want to throw into the ring. It's a repeated use of splitAt wrapped into a State monad that gives an interface identical to (a subset of) Data.Binary.Get. The resulting [ByteString] is obtained in main with a whileJust over getBlob.

module Main (main) where

import           Control.Monad.Loops
import           Control.Monad.State
import qualified Data.Binary.Get      as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import           Data.Int             (Int64)
import           Data.Word            (Word32)
import           System.Environment   (getArgs)

-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString

getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
    let (w, rest) = BL.splitAt 4 bs
    case BL.length w of
        4 -> (Just w', rest) where
            w' = G.runGet G.getWord32be w
        _ -> (Nothing, BL.empty)

getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs

getBlob :: Get (Maybe BL.ByteString)
getBlob = do
    ml <- getWord32be
    case ml of
        Nothing -> return Nothing
        Just l -> do
            blob <- getLazyByteString (fromIntegral l :: Int64)
            return $ Just blob

runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs

main :: IO ()
main = do
    fname <- head <$> getArgs
    bs <- BL.readFile fname
    let ls = runGet loop bs where
        loop = whileJust getBlob return
    print $ length ls

There's no error handling in getBlob, but it's easy to extend. Time and space complexity is quite good, as long as the resulting list is used carefully. (The python script that creates some random data for consumption by the above is here).

mcmayer
  • 1,931
  • 12
  • 22
  • Right, this is conceptually very close to what you would do with `pipes-parse` which uses `StateT (Producer ByteString m x) m r` to consume a byte stream (= `Producer ByteString m r`) bit by bit. If you change this a bit, you might end up accumulating the list of lazy bytestrings again. – Michael Oct 10 '16 at 14:17
  • Note also that you are sort of re-writing `unfoldr`, which takes an argument close to the one you have hidden in `State`, `(s -> Maybe (a, s)) -> s -> [a])`. `unfoldr` corresponds to this mix of `whileMaybe` and `State`. Here's a pipes equivalent that expressly uses `Pipes.Prelude.unfoldr` (which uses Either instead of Maybe to keep the final return value) sprunge.us/GcjM You can see the features of the step function match the ones you are writing, modulo various pipes features. – Michael Oct 10 '16 at 14:43
  • Here's a variant of the unfoldr approach using the `streaming` + `streaming-bytestring` libraries, which just isolate this non-piping aspect of pipes and make it easier to use. http://sprunge.us/QKCQ `streaming-bytestring` is supposed to be as close to a lazy bytestring as can be, while properly streaming, with IO between the internal chunks. It would be very easy to alter this so that the segments are not accumulated as lazy bytestrings but can be separately written chunkwise to separate files or sent out some other way. – Michael Oct 10 '16 at 14:47
  • What really puzzles me - with all approaches - is why this does not perform near the limitation of disk I/O. Disk is the bottleneck, the computations themselves are rather trivial. So far my explanation is that the GC shuffles around vast amounts of RAM... (The profiler says so.) But I find this conclusion odd. Outside GPUs I've never had RAM speed as the bottleneck. – mcmayer Oct 10 '16 at 15:31
  • What arguments are you passing to ghc? What is the file like that you are reading? – Michael Oct 10 '16 at 15:41
  • A program like yours, which basically counts the number of segments in a long file, could certainly be written much more cheaply with a simple loop and hSeek or the like. So could a program that e.g. writes each segment to a different file. The streaming IO libraries accept different costs in order to make this all very plastic and high level. Pipes and conduit wrap each chunk in a free monad step; io-streams puts each chunk in an IORef before proceeding, etc. etc. – Michael Oct 10 '16 at 15:54
  • Yes, I've been thinking about going more low level to avoid all the overhead. I use `-threaded -O2`. The data is a concatenation of 'blobs' of random content and random size in 1..256. For example, with my `whileJust` `splitAt` version I see 10GB bytes allocated in the heap for a 1GB file - neither disk nor CPU are anywhere near max - very weird. (GC is actually not that busy - 12 MB "copied during GC") – mcmayer Oct 10 '16 at 16:11
  • The `pipes` and `streaming` programs do seem to be doing pretty much the same as your state+lazy bytestring program. I tried them all with a 2gb file I arranged to contain 63471 'lazy bytestrings', and got very close to 0m0.487s for each (with `-O2` but no `-threaded`). They strike me as comparatively 'normal' and well behaved. – Michael Oct 10 '16 at 20:12
  • I get a similar result for long lazy bytestrings. But when I make them [shorter](http://sprunge.us/ABWJ), e.g. 100 bytes long, the performance degrades significantly. I have the [impression](http://sprunge.us/CNNX) that the performance correlates with the number of bytes allocated in the heap. – mcmayer Oct 11 '16 at 15:24
  • I'm not knowledgeable about the meaning of the total bytes allocated figure. If I make my 2 gigabyte file out of fragments 100 bytes long, then program isolates 20000000 fragments, which is 315 times as many fragments, but takes 10 times as long. There is a lot more calculation, of course. I don't think there's a whole lot more in memory at each moment. Lazy bytestring readFile & co make a new 'foreign pointer' for each 32k chunk. The "parsing" doesn't make new chunks but at each step defines different offsets. At a boundary it will need to make a 2 chunk bytestring, no more I think. – Michael Oct 12 '16 at 20:00
  • Here is some recent discussion of profiling statistics http://stackoverflow.com/questions/40044364/making-sense-from-ghc-profiler from hyper-knowledgeable people. – Michael Oct 15 '16 at 01:03