3

We are trying to generate column wise statistics of our dataset in spark. In addition to using the summary function from statistics library. We are using the following procedure:

  1. We determine the columns with string values

  2. Generate key value pair for the whole dataset, using the column number as key and value of column as value

  3. generate a new map of format

    (K,V) ->((K,V),1)

Then we use reduceByKey to find the sum of all unique value in all the columns. We cache this output to reduce further computation time.

In the next step we cycle through the columns using a for loop to find the statistics for all the columns.

We are trying to reduce the for loop by again utilizing the map reduce way but we are unable to find some way to achieve it. Doing so will allow us to generate column statistics for all columns in one execution. The for loop method is running sequentially making it very slow.

Code:

//drops the header

    def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

    def retAtrTuple(x: String) = {
       val newX = x.split(",")
       for (h <- 0 until newX.length) 
          yield (h,newX(h))
    }



    val line = sc.textFile("hdfs://.../myfile.csv")

    val withoutHeader: RDD[String] = dropHeader(line)

    val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value


    var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)

    var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
    var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}

    var str_col = str_cols.toArray   //array consisting the string col
    var num_col = num_cols.toArray   //array consisting numeric col


    val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
    val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
    var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }

    //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
    for(i <- str_col){
       var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
       var leastOnes = total.take(10)
       println("leastOnes for Col" + i)
       leastOnes.foreach(println)
       var maxOnes = total.sortBy(-_._2._2).take(10)
       println("maxOnes for Col" + i)
       maxOnes.foreach(println)
       println("distinct for Col" + i + " is " + total.count)
    }
  • 2
    So what is the question though? – serejja Jan 27 '15 at 09:10
  • serejja the question is to use the same (K,V) pair, generated in colCount, which contains the data in format : (colIndex,(colValue,NumoftimesValueOccured)) to find out the summary for each column. or essentially avoiding the serial execution –  Jan 27 '15 at 09:17
  • This is a particularly bad way to represent your data in my opinion. Why not make a separate RDD for each column? Like mentioned in this answer: http://stackoverflow.com/questions/28137208/summary-statistics-for-string-types-in-spark/28141077#28141077 – Daniel Darabos Jan 27 '15 at 14:11
  • Thanks Daniel I also looked at that solution but my concern is how will we be able to execute such summary function in parallel for each column if we arrange the each column in seperate RDD. In that case I assume we will end up executing the same function on each column which may be quite inefficient too. My main objective was to achieve this operation in one go, like we can with hadoop MapReduce. @DanielDarabos –  Jan 27 '15 at 18:01
  • As I understand it, you need one shuffle per column. So this is impossible to do in one go, independent of your choice of distributed computing system. If you say this is possible to do in one go in MapReduce, then I must be misunderstanding what you're trying to do. Please try to improve the question. – Daniel Darabos Jan 27 '15 at 18:11
  • Let me try to explain how Hadoop MapReduce will achieve it in one go, maybe that will help. In the map stage we will emit (K,V) of form K = index of column and V = value for that column. In the reduce stage each reducer will work on a separate column ( Different keys) and will have access to the whole sequence of values for that column. This will allow us to iterate through the values and extract necessary statistics. The motivation for using the mentioned (K,V) pair comes from the corresponding hadoop implementation. @DanielDarabos –  Jan 28 '15 at 04:06
  • 1
    Ah, thank you I see what you mean. Indeed this is hard to reproduce in Spark. Once you have split RDDs, operating on them is just like you want. You need to run the same function on each column, but each run would only touch that column, so you would in effect only pass through the entirety of the data once. But the problem is that you cannot split the data into multiple RDDs in one pass. This is a shortcoming of Spark I feel. I'll add a longer answer with a possible (awkward) solution later today. – Daniel Darabos Jan 28 '15 at 09:47

2 Answers2

2

Let me simplify your question a bit. (A lot actually.) We have an RDD[(Int, String)] and we want to find the top 10 most common Strings for each Int (which are all in the 0–100 range).

Instead of sorting, as in your example, it is more efficient to use the Spark built-in RDD.top(n) method. Its run-time is linear in the size of the data, and requires moving much less data around than a sort.

Consider the implementation of top in RDD.scala. You want to do the same, but with one priority queue (heap) per Int key. The code becomes fairly complex:

import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.

def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
  // A heap that only keeps the top N values, so it has bounded size.
  type Heap = BoundedPriorityQueue[(Long, String)]
  // Get the word counts.
  val counts: RDD[[(Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
  // In each partition create a column -> heap map.
  val perPartition: RDD[Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for (((k, v), count) <- items) {
        heaps(k) += count -> v
      }
      Iterator.single(heaps)
    }
  // Merge the per-partition heap maps into one.
  val merged: Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
  // Discard counts, return just the top strings.
  merged.mapValues(_.map { case(count, value) => value })
}

This is efficient, but made painful because we need to work with multiple columns at the same time. It would be way easier to have one RDD per column and just call rdd.top(10) on each.

Unfortunately the naive way to split up the RDD into N smaller RDDs does N passes:

def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
  together.cache // We will make N passes over this RDD.
  (0 until columns).map {
    i => together.filter { case (key, value) => key == i }.values
  }
}

A more efficient solution could be to write out the data into separate files by key, then load it back into separate RDDs. This is discussed in Write to multiple outputs by key Spark - one Spark job.

Community
  • 1
  • 1
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
0

Thanks for @Daniel Darabos's answer. But there are some mistakes.

  1. mixed use of Map and collection.mutable.Map

  2. withDefault((i: Int) => new Heap(n)) do not create a new Heap when you set heaps(k) += count -> v

  3. mix uasage of parentheses

Here is the modified code:

//import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object BoundedPriorityQueueTest {

  //  https://stackoverflow.com/questions/28166190/spark-column-wise-word-count
  def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
    // A heap that only keeps the top N values, so it has bounded size.
    type Heap = BoundedPriorityQueue[(Long, String)]
    // Get the word counts.
    val counts: RDD[((Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
    // In each partition create a column -> heap map.
    val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
      for (((k, v), count) <- items) {
        println("\n---")
        println("before add " + ((k, v), count) + ", the map is: ")
        println(heaps)
        if (!heaps.contains(k)) {
          println("not contains key " + k)
          heaps(k) = new Heap(n)
          println(heaps)
        }
        heaps(k) += count -> v
        println("after add " + ((k, v), count) + ", the map is: ")
        println(heaps)

      }
      println(heaps)
      Iterator.single(heaps)
    }
    // Merge the per-partition heap maps into one.
    val merged: collection.mutable.Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
      println(heaps)
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
    // Discard counts, return just the top strings.
    merged.mapValues(_.map { case (count, value) => value }).toMap
  }

  def main(args: Array[String]): Unit = {
    Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console


    val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
    println("# words:" + words.count())

    val result = top(1, words)

    println("\n--result:")
    println(result)
    sc.stop()

    print("DONE")
  }

}
Chao Liang
  • 41
  • 4
  • 1
    Considering that the other answer is both upvoted and accepted as the correct one, you should really explain what errors in the code of the other answer, your code corrects. – Tom Brunberg Feb 11 '18 at 17:29
  • @TomBrunberg Thanks for your friendly reminder. I've just listed what I change in my answer. – Chao Liang Feb 12 '18 at 06:18
  • Thank you! Perhaps one more thing: You did address @DanielDarabos in your post to draw his attention to your criticism. That is a fair and good gesture. However, I'm not sure the addressing sign (@) works in question and answer posts, so I added it also here, in this comment, just in case. Cheers. – Tom Brunberg Feb 12 '18 at 06:34