I'm using the streaming
package.
I want to use result of one step defined by the S.store
as a parameter to a following step in the pipeline by preserving the constant memory. The myStream
is loaded and parsed from a file.
I have a following example that typechecks:
import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print
However, the line & insertB M.empty
is taking an empty map but I want to use the map from the previous step, from the insertA
function.
The insertB
function then uses this Map for a lookup.
The solution I can think of is the following:
run :: IO ()
run =
myStream
& S.store insertA
& ( \e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print
Questions
Does this preserve streaming benefits like running in constant memory?
How does it solve this in the background, since the stream needs to be processed as a whole to get the Map
? It passes the same stream multiple times - loads it from a file 2 times to preserve the constant memory?
In case this would be the case (loads the file 2 times), what if the source of the stream was not from parsing a file but from some data stream that can be read only once?
Is there any other elegant solution to this problem that also holds the benefits of streaming where the next step in a pipeline needs to use the result of the previous one?