2

When I use the cache to store data,I found that spark is running very slow. However, when I don't use cache Method,the speed is very good.My main profile is follows:

SPARK_JAVA_OPTS+="-Dspark.local.dir=/home/wangchao/hadoop-yarn-spark/tmp_out_info 
-Dspark.rdd.compress=true -Dspark.storage.memoryFraction=0.4 
-Dspark.shuffle.spill=false -Dspark.executor.memory=1800m -Dspark.akka.frameSize=100 
-Dspark.default.parallelism=6"

And my test code is:

val file = sc.textFile("hdfs://10.168.9.240:9000/user/bailin/filename")
val count = file.flatMap(line => line.split(" ")).map(word => (word, 1)).cache()..reduceByKey(_+_)
count.collect()

Any answers or suggestions on how I can resolve this are greatly appreciated.

Igor F.
  • 2,649
  • 2
  • 31
  • 39
bamboo2014
  • 31
  • 1
  • 2
  • 2
    I don't think this is a terrible question not sure what the down vote is for. The use of the `cache` function is not always immediately clear. Also the question isn't poorly written and shows effort – aaronman Jul 15 '14 at 14:10

1 Answers1

3

cache is useless in the context you are using it. In this situation cache is saying save the result of the map, .map(word => (word, 1)) in memory. Whereas if you didn't call it the reducer could be chained to the end of the map and the maps results discarded after they are used. cache is better used in a situation where multiple transformations/actions will be called on the RDD after it is created. For example if you create a data set you want to join to 2 different datasets it is helpful to cache it, because if you don't on the second join the whole RDD will be recalculated. Here is an easily understandable example from spark's website.

val file = spark.textFile("hdfs://...")
val errors = file.filter(line => line.contains("ERROR")).cache() //errors is cached to prevent recalculation when the two filters are called
// Count all the errors
errors.count()
// Count errors mentioning MySQL
errors.filter(line => line.contains("MySQL")).count()
// Fetch the MySQL errors as an array of strings
errors.filter(line => line.contains("MySQL")).collect()

What cache is doing internally is removing the ancestors of an RDD by keeping it in memory/saving to disk(depending on the storage level), the reason an RDD must save its ancestors is so it can be recalculated on demand, this is the recovery method of RDD's.

aaronman
  • 18,343
  • 7
  • 63
  • 78
  • You explain how cache is expected to work, and to be used, but not why it is slow. See my [related but more precise question](http://stackoverflow.com/q/37859386/1206998) – Juh_ Jun 16 '16 at 12:40