I'm trying to dispatch the items from a conduit into many output files, the problem is very similar to Conduit - Multiple output file within the pipeline, with a few differences:
In the previous solution, every sink has a filter that decides if the element belongs to that sink or not. In my case every element coming from the upstream goes exactly to one file, and in the case where there is a big number of files it would be better to make only one operation to decide to which file is it going.
The files are created on demand. A "selector" function decides which sink the next element is going to, and if it doesn't exist yet it creates it using a "create new sink" function.
For example if the Source yields: 8 4 7 1 5
And the sink selector is a module 3, then the sequence of actions would be:
Create file 2
Add 8 to file 2
Create file 1
Add 4 to file 1
Add 7 to file 1
Add 1 to file 1
Add 5 to file 2
I'm thinking of a type for this dispatcher like this:
dispatcherSink_ :: (Monad m) =>
(a -> k) -> -- sink selector
(k -> Sink a m ()) -> -- new sink
Sink a m ()
I've tried to write the function using evalStateC with an internal StateT holding a Map of Sinks, but I'm not able to tie up the types. I'm not sure if you can even use the same sink twice.
Is what I'm trying to do even possible?
I'm still a newbie in Haskell, so any help will be appreciated.
Edited
I though I could create a map of ResumableSinks, there is a library in Hackage for that, but it depends on an old and very specific version of Conduit, so cabal couldn't install it. In the end I didn't find a way to write the function with the previous type, able to work with any sink, so I came up with a function that works with files directly:
import System.IO (hClose,openFile,IOMode(WriteMode))
import Conduit
import Data.IOData
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M
import Control.Monad.State.Strict
import Data.ByteString.Char8 (pack)
fileDispatcherSink ::
(MonadIO m, IOData c,Ord k) =>
(a -> k) ->
(a -> c) ->
(k -> FilePath) ->
Sink a m ()
fileDispatcherSink selector toChunked path =
evalStateC M.empty $ dispatcher
where
dispatcher = do
next <- await
m <- get
case next of
Nothing -> liftIO $ F.traverse_ hClose m
Just a -> do
let k = selector a
h <- case M.lookup k m of
Nothing -> do
nh <- liftIO $ openFile (path k) WriteMode
put $ M.insert k nh m
return nh
Just h -> return h
yield (toChunked a) $$ sinkHandle h
dispatcher
testSource :: (Monad m) => Source m Int
testSource = yieldMany [8, 4, 7, 1, 5]
main :: IO ()
main = testSource
$$ fileDispatcherSink (`mod` 3) (pack . show) ((++ ".txt") . show)
Is there a way to write the _dispatcherSink__ function?