0

I am trying to built a large amount of random forest models by group using Spark. My approach is to cache a large input data file, split it into pieces based on the school_id, cache the individual school input file in memory, run a model on each of them, and then extract the label and predictions.

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID).cache)

import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}

def trainModel(df: DataFrame): PipelineModel = {
   val rf  = new RandomForestClassifier()
   //omit some parameters
   val pipeline = new Pipeline().setStages(Array(rf))
   pipeline.fit(df)
}

val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))

val preds = (0 to schools.length -1).map(i => bySchoolArrayModels(i).transform(bySchoolArray(i)).select("prediction", "label")

preds.write.format("com.databricks.spark.csv").
option("header","true").
save("predictions/pred"+schools(i))

The code works fine on a small subset but it takes longer than I expected. It seems to me every time I run an individual model, Spark reads the entire file and it takes forever to complete all the model runs. I was wondering whether I did not cache the files correctly or anything went wrong with the way I code it.

Any suggestions would be useful. Thanks!

Community
  • 1
  • 1
SH Y.
  • 1,709
  • 3
  • 20
  • 21

1 Answers1

3

rdd's methods are immutable, so rdd.cache() returns a new rdd. So you need to assign the cachedRdd to an other variable and then re-use that. Otherwise your are not using the cached rdd.

val cachedModelInput = model_input.cache()
val schools = cachedModelInput.select("School_ID").distinct.collect.flatMap(_.toSeq)
....
  • Thanks for your answer! Is there a way to cache the the list of model inputs by school(bySchoolArray) and the list of models that stored in bySchoplArrayModels? – SH Y. Aug 30 '15 at 23:25
  • bySchoolArray in your code if cached in the right way. Think about that caching as effect only after an action, it's a lazy caching. – axlpado - Agile Lab Aug 31 '15 at 14:56
  • in order to cache models, you can do something like this: val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df).cache()) – axlpado - Agile Lab Aug 31 '15 at 14:57
  • rdd.cache doesn't return a new rdd but an RDD.this.type – eliasah Oct 17 '15 at 18:11