11

It is possible to pull on demand from a number (say two for simplicity) of sources using streams (lazy lists). Iteratees can be used to process data coming from a single source.

Is there an Iteratee-like functional concept for processing multiple input sources? I could imagine an Iteratee whose state signals from which source does it want to pull.

ron
  • 9,262
  • 4
  • 40
  • 73
  • 1
    so you have two sources and you want to get events from whichever has them available first? – Kim Stebel Sep 19 '12 at 14:31
  • Any way of pulling from multiple sources is probably going to be equivalent to some way of combining two sources into one. There are many ways this could be done; could you clarify what behavior you're looking for? – C. A. McCann Sep 19 '12 at 14:57
  • I thought about something general, where the sink can control based on state which source to pull. The specific scenario would be to merge two sorted streams, so I get tuples where the keys of elements equal, and unpairable elements get dropped. – ron Sep 19 '12 at 15:19
  • 1
    Any iteratee library can do this by simply stacking monads. Oleg showed how first (http://okmij.org/ftp/Streams.html#2enum1iter). I've used this with my own iteratee package. It'll work with any of them. – John L Sep 20 '12 at 03:11
  • @John: Thanks! By the way I recall having already heard the phrase "Oleg showed first" :) – ron Sep 20 '12 at 21:22

4 Answers4

15

To do this using pipes you nest the Pipe monad transformer within itself, once for each producer you wish to interact with. For example:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

Just like a Haskell curried function of multiple variables, you partially apply it to each source using composition and runPipe:

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

The above function outputs when run:

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

This trick works for yielding or awaiting to any number of pipes upstream or downstream. It also works for proxies, the bidirectional analogs to pipes.

Edit: Note that this also works for any iteratee library, not just pipes. In fact, John Milikin and Oleg were the original advocates for this approach and I just stole the idea from them.

Gabriella Gonzalez
  • 34,863
  • 3
  • 77
  • 135
6

We're using Machines in Scala to pull in not just two, but an arbitrary amount of sources.

Two examples of binary joins are provided by the library itself, on the Tee module: mergeOuterJoin and hashJoin. Here is what the code for hashJoin looks like (it assumes both streams are sorted):

/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

This code builds up a Plan which is "compiled" to a Machine with the repeatedly method. The type being built here is Tee[A, B, (A, B)] which is a machine with two inputs. You request inputs on the left and right with awaits(left) and awaits(right), and you output with emit.

There is also a Haskell version of Machines.

Apocalisp
  • 34,834
  • 8
  • 106
  • 155
3

Conduits (and, it can be built for Pipes, but that code hasn't been released yet) has a zip primitive that takes two upstreams and combines them as a stream of tuples.

singpolyma
  • 10,999
  • 5
  • 47
  • 71
1

Check out the pipes library, where vertical concatenation might do what you want. For example,

import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void

source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"

sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x

pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)

The sequencing operator (>>) vertically concatenates the sources, yielding the output (on a runPipe)

's'
'a'
'y'
'w'
'h'
'a'
't'
jtobin
  • 3,253
  • 3
  • 18
  • 27