10

I'm looking to build a pipeline pattern with Scala. I wish after I write the pipeline objects, they could be connected together like this:

Pipeline1 :: Pipeline2 :: Pipeline3 ...

I have experimented with a few ideas so far. Some work and some don't. But none of them seems to completely get rid of boilerplate code. The following is the closest I've got.

First define the Pipeline and Source abstract class:

// I is the input type and O is the output type of the pipeline
abstract class Pipeline[I, +O](p: Pipeline[_, _ <: I]) {

  val source = p
  val name: String
  def produce(): O
  def stats():String
}
abstract class Source[+T] extends Pipeline[AnyRef, T](null)

Next, I created two pipelines and try to link them together

// this creates a random integer
class RandomInteger extends Source[Int] {
  override val name = "randInt"

  def produce() = {
    scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
  }

  def stats()="this pipeline is stateless"
}

// multiply it by ten
class TimesTen(p: Pipeline[_, Int]) extends Pipeline[Int, Int](p) {
  private var count = 0 // this is a simple state of the pipeline
  override val name = "Times"
  def produce = {
    val i = source.produce()
    count += 1 // updating the state
    i * 10
  }
  def stats() = "this pipeline has been called for " + count + " times"
}

object TimesTen {
  // this code achieves the desired connection using ::
  // but this has to be repeated in each pipeline subclass. 
  // how to remove or abstract away this boilerplate code? 
  def ::(that: Pipeline[_, Int]) = new TimesTen(that)
}

This is the main class where two pipelines are linked.

object Pipeline {
  def main(args: Array[String]) {
    val p = new RandomInteger() :: TimesTen
    println(p.source)
    for (i <- 0 to 10)
      println(p.produce())
    println(p.stats())
  }
}

So this code works. But I would have to repeat the code in the TimesTen companion object in every pipeline class I write. This is certainly not desirable. Is there any better way to do this? Reflection might work, but I heard bad things about it, such as anything involving reflection is bad design. I'm also unsure about Scala's support for reflection.

Thank you for your time.

Update: I designed this toy problem to make it easy to understand. As a general solution, and as my application requires, each pipeline object has a state, which is ideally encapsulated within the object itself rather than exposed to every other pipeline. I have modified the code above to reflect this. I wish there could be an object-based solution. I'm still experimenting and will let you know if I find one.

Update 2: After some thoughts, I think the idea of the pipeline is really just a generalized function that contains some internal states as well as the ability to compose a Function0 function with a Function1 function. In Scala, the Function0 class does not have the compose() or andThen() method.

Albert Li
  • 201
  • 2
  • 8

4 Answers4

10

Here is the solution with objects using andThen. The idea is to force the creation of Function1 objects by using the input Unit. Connecting two Pipelines creates a new Pipeline with the two functions together. This solution allows Pipelines to have internal states.

A further simplification would be to use apply() instead of produce(). This is left as an exercise for the reader.

abstract class Pipeline[-I, +O] {

  val name: String
  def produce : I => O
  def stats(): String

  def ->[X](seg:Pipeline[_ >: O, X]):Pipeline[I, X] = {
    val func = this.produce
    val outerName = this.name
    new Pipeline[I, X] {
      val name = outerName + "." + seg.name
      def produce = func andThen seg.produce 
      def stats = seg.stats
    }
  }
}

abstract class Source[+T] extends Pipeline[Unit, T] {
}

class RandomInteger extends Source[Int] {
  override val name = "randInt"
  def produce: Unit => Int = (x:Unit) => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) 
  def stats() = "stateless"
}

class TimesTen() extends Pipeline[Int, Int] {
  private var count = 0
  override val name = "times"
  def produce : Int => Int = (x:Int) => {    
    count += 1
    x * 10
  }
  def stats() = "called for " + count + " times"
}


object Main {
  def main(args: Array[String]) {
    val p = new RandomInteger() -> new TimesTen() 

    for (i <- 0 to 10)
      println(p.produce())
    println(p.name)    // print "randInt.times"
    println(p.stats()) // print "called for 11 times"
  }
}
metch
  • 673
  • 7
  • 14
Albert Li
  • 201
  • 2
  • 8
9

Unless I'm missing something, your pipeline objects are just functions, and your :: operator is just "compose"

val randomInteger: ()=>Int = () => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
val timesTen :Int => Int = x => x*10    
val pipeline: () =>Int = timesTen compose randomInteger

Your "produce()" method is just "apply()", but it's common to use it's abbreviation of "()". A small amount of Library pimping would allow you to use an operator for composition. This is one of those cases where object-oriented boilerplate really gets in the way of simple functional concepts. Fortunately, Scala lets you avoid the boilerplate for a lot of use cases like this.

Dave Griffith
  • 20,435
  • 3
  • 55
  • 76
  • IMHO objects could still be useful. Pipelines could have states, which could comfortably fit in an object. I think with the compose function, the problem can be solved. One solution is to create a helper object which contains a list of pipeline objects, and compose them together. I'll see if that works tomorrow. – Albert Li Mar 07 '12 at 04:06
  • Your code does not compile on 2.9.1 - it says that "randomInteger" does not have "compose" method. (as well as "andThen" method) – Rogach Mar 07 '12 at 04:07
  • 1
    `randomInteger compose timesTen` is the wrong way around. You can't take `Unit`, multiply it by ten, and then feed that to `randomInteger`. Instead you want `randomInteger andThen timesTen` or `timesTen compose randomInteger`. Making use of `Function1` is definitely the most convenient way to do make this "pipeline" idea work. – Dan Burton Mar 07 '12 at 04:21
  • 2
    @AlbertLi BTW, you can use reduce to process large list of chained functions: `(func1::func2:func3::func4::Nil).reduce{_ andThen _}` – om-nom-nom Mar 07 '12 at 10:51
  • @om-nom-nom No need in `reduce`, the standard library has `Function.chain[A](s: Seq[(A) => A]): (A) => A` – Alexander Azarov Mar 07 '12 at 16:32
  • Even after modification, this produces an compilation error: `error: type mismatch; found : () => Int required: ? => Int` This is because `compose` takes a `Function1` argument. `randomInteger` is of type `Function0`. `Function.chain` also does not work because it requires the functions chained to have the same input and output type. – Albert Li Mar 08 '12 at 23:38
4
object Pipelining {  implicit def toPipe[T](x : T) = new {     def :: [U](f : T => U) = f(x)  }}

import Pipelining._
List(2,3,4) :: (_.map(_*3)) :: (_.map(_.toString)) :: println 

all credits for StephaneLD "|> operator like in F#"

http://www.scala-lang.org/node/8747

clagccs
  • 2,224
  • 2
  • 21
  • 33
0

You mean like a Dataflow or Functional Reactive Programming? Try this question. The reactive library is actively developed -- I don't know about the rest.

Community
  • 1
  • 1
Daniel C. Sobral
  • 295,120
  • 86
  • 501
  • 681
  • I guess it's more like a generalized function with some internal states and the ability to compose a Function0 with a Function1 – Albert Li Mar 08 '12 at 23:23