1

I'm having trouble directing flow though a pipeline with haskell-pipes. Basically, I analyze a bunch of files and then I have to either

  1. print results to the terminal in a human-friendly way
  2. encode results to JSON

The chosen path depends upon a command line option.
In the second case, I have to output an opening bracket, then every incoming value followed by a comma and then a closing bracket. Currently insertCommas never terminates, so the closing bracket is never outputted.

import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)

insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
    first <- await
    lift $ B.putStr first
    for cat $ \obj -> lift $ do
        putStr ","
        B.putStr obj

jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
    lift $ putStr "["
    P.map encode >-> insertCommas
    lift $ putStr "]"

exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
    case outputMode conf of
      JSON -> jsonExporter
      _    -> P.map (export conf) >-> P.stdoutLn

main :: IO ()
main = do
    -- The first two lines are Docopt stuff, not relevant
    args <- parseArgsOrExit patterns =<< getArgs
    ins  <- allFiles $ args `getAllArgs` argument "paths"
    let conf = readConfig args
    runEffect $ each ins
             >-> P.mapM analyze
             >-> P.map (filterResults conf)
             >-> P.filter filterNulls
             >-> exportStream conf
rubik
  • 8,814
  • 9
  • 58
  • 88

2 Answers2

3

AFAIK a Consumer cannot detect the end of a stream. In order to do that you need to use a Pipes.Parser and invert the control.

Here is a Parser which inserts commas between String elements:

import Pipes
import qualified Pipes.Prelude as P
import Pipes.Parse (draw, evalStateT)

commify = do
  lift $ putStrLn "["
  m1 <- draw
  case m1 of
    Nothing -> lift $ putStrLn "]"
    Just x1 -> do
      lift $ putStrLn x1
      let loop = do mx <- draw
                    case mx of
                      Nothing -> lift $ putStrLn "]"
                      Just x  -> lift (putStr "," >> putStrLn x) >> loop
      loop

test1 = evalStateT commify ( mapM_ yield (words "this is a test") )
test2 = evalStateT commify P.stdinLn

To handle the different output formats I would probably make both formats a Parser:

exportParser = do
  mx <- draw
  case mx of
    Nothing -> return ()
    Just x  -> (lift $ putStrLn $ export x) >> exportParser

and then:

let parser = case outputMode of
               JSON -> commify
               _    -> exportParser
evalStateT parser (P.mapM analyze
                      >-> P.map (filterResults conf)
                      >-> P.filter filterNulls)

There is probably a slicker way to write exportParser in terms of foldAllM. You can also use the MaybeT transformer to more succinctly write the commify parser. I've written both out explicitly to make them easier to understand.

ErikR
  • 51,541
  • 9
  • 73
  • 124
  • You are correct, a downstream proxy can never detect that an upstream proxy `return`ed. When the upstream proxy `return`s, the pipes library has no possible value to pass to the downstream proxy's `Request` to run it any farther. – Cirdec Nov 01 '15 at 18:20
  • This is great, thank you so much for the help! I'll now refactor with `foldAllM` and especially `MaybeT` to understand how transformers work. – rubik Nov 01 '15 at 19:38
1

I think you should 'commify' with pipes-group. It has an intercalates, but not an intersperse, but it's not a big deal to write. You should stay away from the Consumer end, I think, for this sort of problem.

{-#LANGUAGE OverloadedStrings #-}
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.ByteString.Lazy.Char8 as B
import Pipes.Group
import Lens.Simple  -- or Control.Lens or Lens.Micro or anything with view/^.
import System.Environment

intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1) 

main = do 
  args <- getArgs
  let op prod = case args of 
        "json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
        _        -> intersperse_ " " prod
  runEffect $ op producer >-> P.mapM_ B.putStr
  putStrLn ""
  where 
    producer = mapM_ yield (B.words "this is a test")

which give me this

    >>> :main json
    [this,is,a,test]
    >>> :main ---
    this is a test
Michael
  • 2,889
  • 17
  • 16
  • Interesting! But is there a way to make `intersperse_` a Pipe, in order to maintain the pipeline "structure"? Because of the fact that `intersperse_` accepts a producer as argument, I'm forced to build the first part of the pipeline separately and then passing it as argument. And I'm having trouble to integrate it with my current code as it does not typecheck. – rubik Nov 02 '15 at 07:24
  • Ok, I made it to typecheck, but I'd still prefer a "linear" pipeline structure if possible. Looking at the source code of `intercalates` it doesn't seem possible, does it? – rubik Nov 02 '15 at 07:47
  • The short answer is no. `pipes` has several ways of manipulating a `Producer`, piping is just one of them. `pipes-parse` and `pipes-group` are about other ways of manipulating a producer. Thus, dividing a producer into chunks of three is not a piping problem, and can't even be expressed with the other streaming IO libraries, which would force you, e.g. to make a stream of 3 element lists or the like. But `Pipes.Group.chunksOf` is the native expression of it. – Michael Nov 02 '15 at 13:17
  • Similarly, the very elementary problem of splitting a producer at the 23rd element and reserving the rest for another treatment is not a piping problem; it is expressed natively by `Pipes.Parse.splitAt:: Int -> Producer a m r -> Producer a m (Producer a m r)` (specializing). In conduit this sort of problem involves the spectacular apparatus of 'connect' and 'resume'. – Michael Nov 02 '15 at 13:23
  • Similarly, dividing a text or bytestring producer on its lines is not a piping problem; in order to express it as one, conduit and io-streams unstream the lines themselves, concatenating chunks and allowing infinitely long strict bytestrings to be formed in memory (to put it in a paranoid sort of way.) With `pipes` this isn't an issue since it recognizes that there are other things to do with a producer besides `pipe` it. – Michael Nov 02 '15 at 13:35
  • Great info there, thanks a lot for the insights about haskell-pipes and the streaming problem in general. The context is clearer now. – rubik Nov 02 '15 at 15:26