35

I am trying to run random forest classification by using Spark ML api but I am having issues with creating right data frame input into pipeline.

Here is sample data:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

age and hours_per_week are integers while other features including label salaryRange are categorical (String)

Loading this csv file (lets call it sample.csv) can be done by Spark csv library like this:

val data = sqlContext.csvFile("/home/dusan/sample.csv")

By default all columns are imported as string so we need to change "age" and "hours_per_week" to Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

Just to check how schema looks now:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

Then lets set the cross validator and pipeline:

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

Error shows up when running this line:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException: Field "features" does not exist.

It is possible to set label column and feature column in RandomForestClassifier ,however I have 4 columns as predictors (features) not only one.

How I should organize my data frame so it has label and features columns organized correctly?

For your convenience here is full code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}


object SampleClassification {

  def main(args: Array[String]): Unit = {

    //set spark context
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import com.databricks.spark.csv._

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv")

    //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
    val toInt    = udf[Int, String]( _.toInt)
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))


    val rf = new RandomForestClassifier()

    val pipeline = new Pipeline().setStages(Array(rf))

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

    // this fails with error
    //java.lang.IllegalArgumentException: Field "features" does not exist.
    val cmModel = cv.fit(dataFixed) 
  }

}

Thanks for help!

zero323
  • 322,348
  • 103
  • 959
  • 935
Dusan Grubjesic
  • 945
  • 2
  • 9
  • 16
  • Not aware of scala language but where are you setting labels and features from the dataset something like LabeledPoint(labels, list(features)) , check the example in https://spark.apache.org/docs/latest/mllib-linear-methods.html – Abhishek Choudhary Jun 24 '15 at 15:44
  • @ABC, Please check my comment in the question below. – Dusan Grubjesic Jun 24 '15 at 17:05
  • check this example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala where val model = pipeline.fit(training.toDF()) makes use of dataframe in pipeline – Abhishek Choudhary Jun 25 '15 at 11:36

3 Answers3

45

As of Spark 1.4, you can use Transformer org.apache.spark.ml.feature.VectorAssembler. Just provide column names you want to be features.

val assembler = new VectorAssembler()
  .setInputCols(Array("col1", "col2", "col3"))
  .setOutputCol("features")

and add it to your pipeline.

WeiChing 林煒清
  • 4,452
  • 3
  • 30
  • 65
  • 1
    [tuxdna's answer](http://stackoverflow.com/a/31102246/1281433) explained the details of the problem, and what the solution has to look like. **This answer** shows the nice way to accomplish it. – Joshua Taylor Jan 15 '16 at 16:04
  • 1
    This would not work since some of the features are of type String. Great solution for strictly numerical data. – gstvolvr Jan 30 '16 at 19:43
  • 2
    @gstvolvr You'll need to use `StringIndexer` first to convert strings to numeric. Might be worth adding this step to the answer for clarity. – max Jul 30 '16 at 23:44
33

You simply need to make sure that you have a "features" column in your dataframe that is of type VectorUDF as show below:

scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]

scala> val cmModel = cv.fit(df2) 
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
    at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
    at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)

EDIT1

Essentially there need to be two fields in your data frame "features" for feature vector and "label" for instance labels. Instance must be of type Double.

To create a "features" fields with Vector type first create a udf as show below:

val toVec4    = udf[Vector, Int, Int, String, String] { (a,b,c,d) => 
  val e3 = c match {
    case "hs-grad" => 0
    case "bachelors" => 1
    case "masters" => 2
  }
  val e4 = d match {case "male" => 0 case "female" => 1}
  Vectors.dense(a, b, e3, e4) 
}

Now to also encode the "label" field, create another udf as shown below:

val encodeLabel    = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )

Now we transform original dataframe using these two udf:

val df = dataFixed.withColumn(
  "features",
  toVec4(
    dataFixed("age"),
    dataFixed("hours_per_week"),
    dataFixed("education"),
    dataFixed("sex")
  )
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")

Note that there can be extra columns / fields present in the dataframe, but in this case I have selected only features and label:

scala> df.show()
+-------------------+-----+
|           features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]|  0.0|
|[28.0,40.0,1.0,1.0]|  0.0|
|[52.0,45.0,0.0,0.0]|  1.0|
|[31.0,50.0,2.0,1.0]|  1.0|
|[42.0,40.0,1.0,0.0]|  1.0|
+-------------------+-----+

Now its upto you to set correct parameters for your learning algorithm to make it work.

tuxdna
  • 8,257
  • 4
  • 43
  • 61
  • Any chance you can show how I can create column named "features" of type VectorUDF from my data ? – Dusan Grubjesic Jun 28 '15 at 18:22
  • 1
    @DusanGrubjesic: I have added code examples. Please check **EDIT1** – tuxdna Jun 29 '15 at 09:54
  • this is really great! I am just not sure how we can pass information to the classifier from ML that now these e3 and e4 are categorical features not numerical? Cause in "low level" mllib api it was possible to pass **categoricalFeaturesInfo** with indexes and number of categories of categorical features. In "high level" ml api , this should be extracted directly from schema. – Dusan Grubjesic Jun 30 '15 at 11:21
  • In this case the resluting `Vector` of `Double` values ( all numerical ) constitute your feature vector. You may want to do standardization, ohe-hot encoding, normalization ... whatever you seem fit for your algorithm but the values in your feature vector have to be all `Double`. Which low-level API are you refering to? – tuxdna Jun 30 '15 at 11:30
  • There are 2 packages in Spark for machine learning. One is [mllib](https://spark.apache.org/docs/1.4.0/mllib-guide.html) -this one I call "low level api" and another is [ml](https://spark.apache.org/docs/latest/ml-guide.html) - this I call "high level api". Anyway , tuxdna thanks for help I will select your answer as best from all the others. – Dusan Grubjesic Jun 30 '15 at 12:35
  • 1
    @DusanGrubjesic: I am glad that it was helpful. And thanks for distinction between mlllib and ml :-) – tuxdna Jun 30 '15 at 13:28
0

According to spark documentation on mllib - random trees, seems to me that you should define the features map that you are using and the points should be a labeledpoint.

This will tell the algorithm which column should be used as prediction and which ones are the features.

https://spark.apache.org/docs/latest/mllib-decision-tree.html

Adriano Almeida
  • 5,186
  • 5
  • 20
  • 28
  • 1
    There is an old api located in package **mllib** and the points should be LabeledPoint indeed. However, I am trying to use new api located in *ml* package cause it supports pipelines , cross validation etc.. This new api uses DataFrame as input. e.g. compare these two : [RandomForestClassifier](https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.ml.classification.RandomForestClassifier) from **ml** which uses DataFrame and RandomForestModel (https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel) from **mllib** – Dusan Grubjesic Jun 24 '15 at 17:01