0

I am using scala to write up a spark application that reads data from csv files using dataframes (none of these details matter really, my question can be answered by anyone who is good at functional programming)

I'm used to sequential programming and its taking a while to think of things in the functional way.

I basically want to read to columns (a,b) from a csv file and keep track of those rows where b < 0.

I implemented this but its pretty much how I would do it Java and I would like to utilize Scala's features instead:

val ValueDF = fileDataFrame.select("colA", "colB")                                           
val ValueArr = ValueDF.collect()

        for ( index <- 0 until (ValueArr.length)){
            var row = ValueArr(index)
            var A = row(0).toString()
            var B = row(1).toString().toDouble

            if (B < 0){
                //write A and B somewhere
            }
        }

Converting the dataframe to an array defeats the purpose of distributed computation. So how could I possibly get the same results but instead of forming an array and traversing through it, I would rather want to perform some transformations of the data frame itself (such as map/filter/flatmap etc).

I should get going soon hopefully, just need some examples to wrap my head around it.

user3376961
  • 867
  • 2
  • 12
  • 17
  • if ValueArr is a collection, it should have the `filter()` function, which takes a function `(a,b) => Boolean` as a parameter and returns a collection of only those elements, where the function returned true. – thwiegan Aug 05 '15 at 19:23

1 Answers1

4

You are doing basically a filtering operation (ignore if not (B < 0)) and mapping (from each row, get A and B / do something with A and B).

You could write it like this:

val valueDF = fileDataFrame.select("colA", "colB")                                           
val valueArr = valueDF.collect()

val result = valueArr.filter(_(1).toString().toDouble < 0).map{row => (row(0).toString(), row(1).toString().toDouble)}

// do something with result

You also can do first the mapping and then the filtering:

val result = valueArr.map{row => (row(0).toString(), row(1).toString().toDouble)}.filter(_._2 < 0)

Scala also offers more convenient versions for this kind of operations (thanks Sascha Kolberg), called withFilter and collect. withFilter has the advantage over filter that it doesn't create a new collection, saving you one pass, see this answer for more details. With collect you also map and filter in one pass, passing a partial function which allows to do pattern matching, see e.g. this answer.

In your case collect would look like this:

val valueDF = fileDataFrame.select("colA", "colB")                                           
val valueArr = valueDF.collect()

val result = valueArr.collect{
  case row if row(1).toString().toDouble < 0) => (row(0).toString(), row(1).toString().toDouble)
}
// do something with result

(I think there's a more elegant way to express this but that's left as an exercise ;))

Also, there's a lightweight notation called "sequence comprehensions". With this you could write:

val result = for (row <- valueArr if row(1).toString().toDouble < 0) yield (row(0).toString(), row(1).toString().toDouble)

Or a more flexible variant:

val result = for (row <- valueArr) yield {
  val double = row(1).toString().toDouble
  if (double < 0) {
    (row(0).toString(), double)
  }
}

Alternatively, you can use foldLeft:

val valueDF = fileDataFrame.select("colA", "colB")                                           
val valueArr = valueDF.collect()

val result = valueArr.foldLeft(Seq[(String, Double)]()) {(s, row) =>
  val a = row(0).toString()
  val b = row(1).toString().toDouble

  if (b < 0){
    s :+ (a, b) // append tuple with A and B to results sequence
  } else {
    s // let results sequence unmodified 
  }
}

// do something with result

All of these are considered functional... which one you prefer is for the most part a matter of taste. The first 2 examples (filter/map, map/filter) do have a performance disadvantage compared to the rest because they iterate through the sequence twice.

Note that in FP it's very important to minimize side effects / isolate them from the main logic. I/O ("write A and B somewhere") is a side effect. So you normally will write your functions such that they don't have side effects - just input -> output logic without affecting or retrieving data from the surroundings. Once you have a final result, you can do side effects. In this concrete case, once you have result (which is a sequence of A and B tuples), you can loop through it and print it. This way you can for example change easily the way to print (you may want to print to the console, send to a remote place, etc.) without touching the main logic.

Also you should prefer immutable values (val) wherever possible, which is safer. Even in your loop, row, A and B are not modified so there's no reason to use var.

(Btw, I corrected the values names to start with lower case, see conventions).

Community
  • 1
  • 1
User
  • 31,811
  • 40
  • 131
  • 232
  • 3
    Just a tiny thing about `filter().map()`: Whenever someone uses `filter()` followed by `map()` it is almost always better to instead use `withFilter().map()` or `collect { case => ???} ` which is basically what your first *for-comprehension* does (`val result = for (row <- valueArr if row(1).toString().toDouble < 0) yield (row(0).toString(), row(1).toString().toDouble)`). – Sascha Kolberg Aug 06 '15 at 08:05
  • 1
    @lxx Thanks a lot for the detailed answer! Exactly what I needed. – user3376961 Aug 13 '15 at 16:30