0

I'm new in Scala and i'm facing a few problems in my assignment : I want to build a stream class that can do 3 main tasks : filter,map,and forEach. My streams data is an array of elements. Each of the 3 main tasks should run in 2 different threads on my streams array. In addition, I need to divde the logic of the action and its actual run to two different parts. First declare all tasks in stream and only when I run stream.run() I want the actual actions to happen.

My code :

class LearningStream[A]() {
  val es: ExecutorService = Executors.newFixedThreadPool(2)
  val ec = ExecutionContext.fromExecutorService(es)
  var streamValues: ArrayBuffer[A] = ArrayBuffer[A]()
  var r: Runnable = () => "";

  def setValues(streamv: ArrayBuffer[A]) = {
    streamValues = streamv;
  }

  def filter(p: A => Boolean): LearningStream[A] = {
    var ls_filtered: LearningStream[A] = new LearningStream[A]()
    r = () => {
      println("running real filter..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[A]=es.submit(()=>l.filter(p)).get()
      val b:ArrayBuffer[A]=es.submit(()=>r.filter(p)).get()
      ms_filtered.setValues(a++b)
    }
    return ls_filtered
  }

  def map[B](f: A => B): LearningStream[B] = {
    var ls_map: LearningStream[B] = new LearningStream[B]()
    r = () => {
      println("running real map..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[B]=es.submit(()=>l.map(f)).get()
      val b:ArrayBuffer[B]=es.submit(()=>r.map(f)).get()
      ls_map.setValues(a++b)
    }
    return ls_map
  }

  def forEach(c: A => Unit): Unit = {
    r=()=>{
      println("running real forEach")
      streamValues.foreach(c)}
  }

   def insert(a: A): Unit = {
    streamValues += a
  }

  def start(): Unit = {
    ec.submit(r)
  }

   def shutdown(): Unit = {
    ec.shutdown()
  }
}

my main :

def main(args: Array[String]): Unit = {
    var factorial=0
    val s = new LearningStream[String]
    s.filter(str=>str.startsWith("-")).map(s=>s.toInt*(-1)).forEach(i=>factorial=factorial*i)

    for(i <- -5 to 5){
      s.insert(i.toString)
    }
    println(s.streamValues)
    s.start()
    println(factorial)
    }

The main prints only the filter`s output and the factorial isnt changed (still 1). What am I missing here ?

JeyJ
  • 3,582
  • 4
  • 35
  • 83
  • 1
    (being perhaps a little cryptic because this looks like a homework assignment) The intended semantics of the stream are that it be set up and then run, correct? `filter` and friends result in new streams: are those new streams ever being run? – Levi Ramsey Jan 07 '20 at 16:10
  • 2
    One general observation: if I were grading this, using `return` would tend to result in a failing grade absent a comment explaining why `return` is correct (it basically never is). I'd also note that `var` is pretty non-idiomatic Scala and in most of the code you posted, unnecessary. – Levi Ramsey Jan 07 '20 at 16:15
  • Regarding return - it isnt necessary here , just used to it from java. Still, it doesnt change the behavior of the code.. Why var is unnecessary ? If I change the object then I cant use val.. – JeyJ Jan 07 '20 at 16:20
  • You only need a var if you're going to reassign it. In your code, `streamValues` and `r` are the only vars you're reassigning. `var` or `val` make no difference in terms of what methods you can call on the object you've assigned to the var/val. See, for instance https://stackoverflow.com/questions/11386559/val-mutable-versus-var-immutable-in-scala. – Levi Ramsey Jan 07 '20 at 19:17
  • oh so u meant that the 2 streams that I create can also be val. Yeah got it that is right, still it doesnt change the main issue here.. – JeyJ Jan 07 '20 at 20:20
  • Look back at the first comment, and think about if you're ever starting those streams – Levi Ramsey Jan 07 '20 at 20:56
  • but I run them, I call s.start -> the executorContext should submit the task and start it.. – JeyJ Jan 07 '20 at 21:38
  • You run `s` and the filter stage gets run. Are you creating other `LearningStream`s and are those `LearningStream`s getting run? – Levi Ramsey Jan 08 '20 at 14:58
  • The way I see it : Create new LearningStreams s -> s run filter command and updates its runnable to filter its data and returns new stream ls_filter -> ls_filter runs map which sets its runnable to map the data and returns new runnable ls_map-> ls_map sets its runnable for foreach logic and exit. Now s contains in its runnable filter, I submit s learningStream via executionContext and it runs the filter logic but nothing runs afterwards the other streams. I see. Writing this helped me understand that I need to call from each runnable the next stream`s runnable. – JeyJ Jan 08 '20 at 18:29
  • @LeviRamsey I added to every runnable ec.submit(the next LearningStream). If I want to close all of them recursivly, is there any nice way to do it ? Right now, the shutdown kiils only the filter threads that were created – JeyJ Jan 08 '20 at 19:48
  • Each one of the streams/stages you're creating has its own `ExecutionContext`, so shutting down one doesn't shut the others down as well. – Levi Ramsey Jan 09 '20 at 18:56
  • yeah I know, but I found a solution. I will test it and update the post once it works – JeyJ Jan 10 '20 at 16:31

1 Answers1

0

My solution: @Levi Ramsey left a few good hints in the comments if you want to get hints and not the real solution.

First problem: Only one command (filter) run and the other didn't. solution: insert to the runnable of each command a call for the next stream via:

ec.submit(ms_map.r)

In order to be able to close all sessions, we need to add another LearningStream data member to the class. However we can't add just a regular LearningStream object because it depends on parameter [A]. Therefore, I implemented a trait that has the close function and my data member was of that trait type.

halfer
  • 19,824
  • 17
  • 99
  • 186
JeyJ
  • 3,582
  • 4
  • 35
  • 83