1

My task is to write a code that reads a big file (doesn't fit into memory) reverse it and output most five frequent words .
i have written the code below and it does the job .

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

 object ReverseFile {
  def main(args: Array[String]) {


    val conf = new SparkConf().setAppName("Reverse File")
    conf.set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)
    val txtFile = "path/README_mid.md"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    val tmp = txtData.map(l => l.reverse).zipWithIndex().map{ case(x,y) => (y,x)}.sortByKey(ascending = false).map{ case(u,v) => v}

    tmp.coalesce(1,true).saveAsTextFile("path/out.md")

    val txtOut = "path/out.md"
    val txtOutData = sc.textFile(txtOut)
    txtOutData.cache()

    val wcData = txtOutData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(ascending = false)
    wcData.collect().take(5).foreach(println)


  }
}

The problem is that i'm new to spark and scala, and as you can see in the code first i read the file reverse it save it then reads it reversed and output the five most frequent words .

  • Is there a way to tell spark to save tmp and process wcData (without the need to save,open file) at the same time because otherwise its like reading the file twice .
  • From now on i'm going to tackle with spark a lot, so if there is any part of the code (not like the absolute path name ... spark specific) that you might think could be written better i'de appreciate it.
Epsilon
  • 73
  • 2
  • 12

2 Answers2

2
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object ReverseFile {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Reverse File")
    conf.set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)
    val txtFile = "path/README_mid.md"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    val reversed = txtData
      .zipWithIndex()
      .map(_.swap)
      .sortByKey(ascending = false)
      .map(_._2) // No need to deconstruct the tuple.

    // No need for the coalesce, spark should do that by itself.
    reversed.saveAsTextFile("path/reversed.md")

    // Reuse txtData here.
    val wcData = txtData
      .flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .map(_.swap)
      .sortByKey(ascending = false)

    wcData
      .take(5) // Take already collects.
      .foreach(println)
  }
}

Always do the collect() last, so Spark can evaluate things on the cluster.

zero323
  • 322,348
  • 103
  • 959
  • 935
Reactormonk
  • 21,472
  • 14
  • 74
  • 123
  • Take(5) and collect, why? – eliasah Nov 07 '15 at 15:06
  • thanks for the answer but couple of notes : 1 - without coalesce spark save the file as partitions i want to save it as one , 2- take(5) before collect() gives compilation error (not enough arguments for collect) , 3 - can you please provide me with a link to a tutorial about all this map(_+_/_._2) undescoring things . thanks alot – Epsilon Nov 07 '15 at 15:43
  • 1
    1 - ok. Change as needed. 2 - Fixed the collect. 3 - https://stackoverflow.com/questions/8000903/what-are-all-the-uses-of-an-underscore-in-scala#8000934 – Reactormonk Nov 07 '15 at 17:39
  • 1
    @eliasah Good question is why sort :) – zero323 Nov 07 '15 at 19:05
  • @zero323 I didn't look that far. :) – eliasah Nov 07 '15 at 20:35
2

The most expensive part of your code is sorting so the obvious improvement is to remove it. It is relatively simple in the second case where full sort is completely obsolete:

val wcData = txtData
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // No need to swap or sort

// Use top method and explicit ordering in place of swap / sortByKey
val wcData = top(5)(scala.math.Ordering.by[(String, Int), Int](_._2))

Reversing order of lines is a little bit trickier. First lets reorder elements per partition:

val reversedPartitions = txtData.mapPartitions(_.toList.reverse.toIterator)

Now you have two options

  • use custom partitioner

    class ReversePartitioner(n: Int) extends Partitioner {
      def numPartitions: Int = n
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[Int]
        return numPartitions - 1 - k
      }
    }
    
    val partitioner = new ReversePartitioner(reversedPartitions.partitions.size)
    
    val reversed = reversedPartitions
      // Add current partition number
      .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toList)))
      // Repartition to get reversed order
      .partitionBy(partitioner)
      // Drop partition numbers
      .values
      // Reshape
      .flatMap(identity)
    

    It still requires shuffling but it is relatively portable and data is still accessible in memory.

  • if all you want is to save reversed data you can call saveAsTextFile on reversedPartitions and reorder output files logically. Since part-n name format identifies source partitions all you have to do is to rename part-n to part-(number-of-partitions - 1 -n). It requires saving data so it is not exactly optimal but if you for example use in-memory file system can be a pretty good solution.

zero323
  • 322,348
  • 103
  • 959
  • 935