2

I'm trying to dispatch the items from a conduit into many output files, the problem is very similar to Conduit - Multiple output file within the pipeline, with a few differences:

  • In the previous solution, every sink has a filter that decides if the element belongs to that sink or not. In my case every element coming from the upstream goes exactly to one file, and in the case where there is a big number of files it would be better to make only one operation to decide to which file is it going.

  • The files are created on demand. A "selector" function decides which sink the next element is going to, and if it doesn't exist yet it creates it using a "create new sink" function.

For example if the Source yields: 8 4 7 1 5
And the sink selector is a module 3, then the sequence of actions would be:

Create file 2
Add 8 to file 2
Create file 1
Add 4 to file 1
Add 7 to file 1
Add 1 to file 1
Add 5 to file 2

I'm thinking of a type for this dispatcher like this:

        dispatcherSink_ :: (Monad m) => 
                           (a -> k) ->               -- sink selector
                           (k -> Sink a m ()) ->     -- new sink
                           Sink a m ()

I've tried to write the function using evalStateC with an internal StateT holding a Map of Sinks, but I'm not able to tie up the types. I'm not sure if you can even use the same sink twice.

Is what I'm trying to do even possible?

I'm still a newbie in Haskell, so any help will be appreciated.

Edited

I though I could create a map of ResumableSinks, there is a library in Hackage for that, but it depends on an old and very specific version of Conduit, so cabal couldn't install it. In the end I didn't find a way to write the function with the previous type, able to work with any sink, so I came up with a function that works with files directly:

import System.IO (hClose,openFile,IOMode(WriteMode))
import Conduit
import Data.IOData
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M
import Control.Monad.State.Strict
import Data.ByteString.Char8 (pack)


fileDispatcherSink ::
  (MonadIO m, IOData c,Ord k) =>
  (a -> k) ->
  (a -> c) ->
  (k -> FilePath) ->
  Sink a m ()
fileDispatcherSink selector toChunked path =
  evalStateC M.empty $ dispatcher 
  where 
    dispatcher = do
      next  <- await
      m <- get
      case next of
        Nothing -> liftIO $ F.traverse_ hClose m
        Just a -> do
          let k = selector a
          h <- case M.lookup k m of
                Nothing -> do
                  nh <- liftIO $ openFile (path k) WriteMode
                  put $ M.insert k nh m
                  return nh
                Just h -> return h
          yield (toChunked a) $$ sinkHandle h
          dispatcher

testSource :: (Monad m) => Source m Int
testSource = yieldMany [8, 4, 7, 1, 5]

main :: IO ()
main = testSource
       $$ fileDispatcherSink (`mod` 3) (pack . show) ((++ ".txt") . show)

Is there a way to write the _dispatcherSink__ function?

Community
  • 1
  • 1
jeroko
  • 418
  • 6
  • 9

1 Answers1

2

There is a conceptual problem with implementing

dispatcherSink_ :: (Monad m) => 
                   (a -> k) ->               -- sink selector
                   (k -> Sink a m ()) ->     -- new sink
                   Sink a m ()

. In conduit, data is pulled from upstream to downstream, instead of being pushed. So a Sink decides if it requests a next input value from its upstream conduit or not. So you can't really have a map of Sinks, read an input value and then feed it to one of the Sinks. The Sink you select might not decide to read the input value, it might decide to finish, and then what will you do with the input value? You can create a new sink for that key, but it can also decide not to accept the input.

So instead of a Sink you'll most likely need some different concept, something to which you can push a value and also what you can finalize. An idea (untested):

data PushSink m i = PushSink { psPush :: i -> m (PushSink m i)
                             , psFinalize :: m () }

An implementation for writing files would open a file, keep the handle, and psPush would just write a chunk into the file, returning the same object, while psFinalize would close the file.

And then you can implement a variant like this

dispatcherSink_ :: (Monad m) => 
                   (a -> k) ->                 -- sink selector
                   (k -> m (PushSink a m)) ->  -- new sink
                   Sink a m ()

which pushes values to PushSinks and finalizes them all when there is no input.

Petr
  • 62,528
  • 13
  • 153
  • 317
  • Thanks @PetrPudlák, that is an interesting approach. I have the feeling that what I'm trying to do would fit better a kind of "reactive conduit", if there is anything like that. In this case, how does the zipSinks function work? I'm afraid the source code for it is still beyond my reach. – jeroko Aug 05 '14 at 16:41