I am trying to adapt Store
encoding and decoding for Streaming
. Store
already implements a streaming decode
with a function called decodeMessageBS
.
I tried to do a basic implementation of store
deserialization for Streaming
as below (without bracket
for now to keep it simple). However, there seems to be something wrong with the logic for popper
because decodeMessageBS
keeps throwing PeekException
:
{-# LANGUAGE RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming
streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
ref <- lift $ newIORef inp
let popper = do
r <- S.uncons =<< readIORef ref
case r of
Nothing -> return Nothing
Just (a,rest) -> writeIORef ref rest >> return (Just a)
let go = do
r <- lift $ decodeMessageBS bb $ popper
lift $ print "Decoding"
case r of
Nothing -> return ()
Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
go
I can decode my test file fine with decodeIOPortionWith
- so, the problem seems to be in the logic needed to feed decodeMessageBS
. Will appreciate pointers on what is wrong with the logic of popper
here.