0

i have to write a program in Scala, using spark which counts how many times a word occours in a text, but using the RDD my variable count always displays 0 at the end. Can you help me please? This is my code

import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object wordcount {
    def main(args: Array[String]) {
      // set spark context
      val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
      val sc = new SparkContext(conf)

      val distFile = sc.textFile("bible.txt")

      print("Enter word to loook for in the HOLY BILE: ")
      val word = Console.readLine
      var count = 0;
      println("You entered " + word)

      for (bib <- distFile.flatMap(_.split(" "))) {

        if (word==bib) {
            count += 1

        }

      }  
      println(word + " occours " + count + " times in the HOLY BIBLE!")
    }
}
ScazzoMatto
  • 99
  • 1
  • 4
  • 12

5 Answers5

5

I suggest you to use available transformations in RDD instead of your own program (though its not harm) to get the desired result, for example following code could be used to retrieve the word count.

val word = Console.readLine
println("You entered " + word)
val input = sc.textFile("bible.txt")
val splitedLines = input.flatMap(line => line.split(" "))
                    .filter(x => x.equals(word))

System.out.println(splitedLines.count())

Please refer to this link for more information about the internals of Spark.

Community
  • 1
  • 1
Sathish
  • 4,975
  • 3
  • 18
  • 23
  • Downvoted only because the for block will result in a `foreach` so stating that no action is called is incorrect. – Justin Pihony Jun 09 '15 at 15:47
  • @Justin Pihony, I see you are right, so i removed my comments about the RDD actions. Thank you for point that out. – Sathish Jun 09 '15 at 16:05
  • I've got one more problem. I made the JAR of the program and i want to run it in spark over hadoop in the google cloud platform but I get the error that ClassNotFound even if it works when i play it in local in the spark stanalone. Can you help me please? thanks – ScazzoMatto Jun 10 '15 at 12:11
3

The problem is that you are using a mutable variable on a distributed set. This is hard to control in normal situations, and especially in Spark, the variable is copied separately to each worker. So, they end up with their own version of the count variable and the original is actually never updated. You would need to use an accumulator, which is only guaranteed for actions. All that said, you can accomplish this without variables or accumulators:

val splitData = distFile.flatMap(_.split(" "))
val finalCount = splitData.aggregate(0)(
  (accum, word) => if(word == bib) accum + 1 else accum,
  _ + _)

What this is doing is first seeding the count with a 0. Then, the first operation is what will be run on each partition. The accum is the accumulated count and the word is the current word to compare. The second operation is simply the combiner used to add all of the partition's counts together.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
1
val textFile = sc.textFile("demoscala.txt")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("WordCountSpark")  

If anyone is confused of (_).Good blog below

http://www.codecommit.com/blog/scala/quick-explanation-of-scalas-syntax

Chinmoy
  • 1,391
  • 13
  • 14
0

I think that the iteration : bib <- distFile.flatMap(_.split(" ")) won't work, because your data is in the RDD, try to do a collect like :

for (bib<-distFile.flatMap(_.split(" ")).collect).

(it works just in case of your data are not huge, and you can make a collect on it)

otherwise, if your data set is huge you can do like :

val distFile = sc.textFile("bible.txt")
val word = Console.readLine
val count = distFile.flatMap(_.split(" ")).filter(l=>l==word).count
println(word + " occours " + count + " times in the HOLY BIBLE!")
ch9lb
  • 51
  • 3
  • What if the data is too large to collect onto the driver? This does not solve this from a big data perspective. It just turns it back into a single threaded, one machine problem. – Justin Pihony Jun 09 '15 at 14:36
-1
val text=sc.textfile("filename.txt")

val counts=text.flatmap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_) counts.collect
Swanand
  • 4,027
  • 10
  • 41
  • 69