5

I have been trying to do a simple random forest regression model on PySpark. I have a decent experience of Machine Learning on R. However, to me, ML on Pyspark seems completely different - especially when it comes to the handling of categorical variables, string indexing, and OneHotEncoding (When there are only numeric variables, I was able to perform RF regression just by following examples). While there are a lot of examples available for handling categorical variables, such as this and this, I have had no success with any of them as most of them went over my head (probably because of my unfamiliarity with Python ML). I will be grateful to anyone who can help fix this.

Here is my attempt: inputfile is here

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes

The output is:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]

Next I choose my variables of interest:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()

Output is:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]

My dependent variable is ConversionPayOut, previously a string type is now converted to a double-type.

From here starts my confusion: Based on this post, I understood I have to convert my categorical stringtype variables to onehot encoded vectors. Here is my attempt at that:

First a StringIndexing:

`

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()

`

Output of StringIndexing:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|`


Next, I think, I have to do the OneHOtEncoding of the String Indexes:

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()

`

Out after one-hot encoding looks like this:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[7],[1.0])|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|        (1,[0],[1.0])|    (9,[2],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[5],[1.0])|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[0],[1.0])|

`

I am clueless as to how to proceed forward. In fact, I am clueless about which Spark Machine Learning packages require us to do this one-hot encoding and which ones do not.

It would be really great learning for all the newbies of PySpark if the StackOverflow community could clarify on how to go forward.

honeybadger
  • 1,465
  • 1
  • 19
  • 32

2 Answers2

1

To run Random Forest on your pre-processed data you can proceed with below code.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#use VectorAssembler to combine all the feature columns into a single vector column
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
pipeline = Pipeline(stages=assembler)
df = pipeline.fit(train_OHE).transform(train_OHE)
df = df.withColumn("label", train_OHE.ConversionPayOut)

#randomly split data into training and test dataset
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111)

# train RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)


Hope this helps!

Prem
  • 11,775
  • 1
  • 19
  • 33
  • Thank you for the answer. This was similar to what I tried. But I ran into new errors after running the VectorAssembler. Can you please have a look at this question. https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – honeybadger Sep 23 '17 at 21:13
  • @kasa Can you pls try this code snippet and let us know if you are still getting the same error? – Prem Sep 24 '17 at 07:28
0

Following is the comprehensive example (data file is shared at https://drive.google.com/open?id=1z4YKyqIrLmWY1wNeqGrKVdTGfckqikDt) -

package com.nik.spark.ml.examples.regression.randomForest

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession
import scala.Range
import org.apache.spark.ml.classification.RandomForestClassifier

object RandomForestDemo {

  def main(args: Array[String]) {
    // Optional: Use the following code below to set the Error reporting
    import org.apache.log4j._
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Spark Session
    val spark = SparkSession.builder().master("local[*]").getOrCreate()

    // Use Spark to read in the Titanic csv file.
    val data = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load("adult-training.csv")

    // Print the Schema of the DataFrame
    data.printSchema()

    ///////////////////////
    /// Display Data /////
    /////////////////////
    val colnames = data.columns
    val firstrow = data.head(1)(0)
    println("\n")
    println("Example Data Row")
    for (ind <- Range(1, colnames.length)) {
      println(colnames(ind))
      println(firstrow(ind))
      println("\n")
    }

    ////////////////////////////////////////////////////
    //// Setting Up DataFrame for Machine Learning ////
    //////////////////////////////////////////////////
    import spark.implicits._
    // Grab only the columns we want
    val logregdataall = data.select($"income", $"workclass", $"fnlwgt", $"education", $"education-num", $"marital-status", $"occupation", $"relationship", $"race", $"sex", $"capital-gain", $"capital-loss", $"hours-per-week", $"native-country")
    val logregdata = logregdataall.na.drop()

    // A few things we need to do before Spark can accept the data!
    // Convert categorical columns into a binary vector using one hot encoder
    // We need to deal with the Categorical columns

    // Import VectorAssembler and Vectors
    import org.apache.spark.ml.feature.{ VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder }
    import org.apache.spark.ml.linalg.Vectors

    // Deal with Categorical Columns
    // Transform string type columns to string indexer 
    val workclassIndexer = new StringIndexer().setInputCol("workclass").setOutputCol("workclassIndex")
    val educationIndexer = new StringIndexer().setInputCol("education").setOutputCol("educationIndex")
    val maritalStatusIndexer = new StringIndexer().setInputCol("marital-status").setOutputCol("maritalStatusIndex")
    val occupationIndexer = new StringIndexer().setInputCol("occupation").setOutputCol("occupationIndex")
    val relationshipIndexer = new StringIndexer().setInputCol("relationship").setOutputCol("relationshipIndex")
    val raceIndexer = new StringIndexer().setInputCol("race").setOutputCol("raceIndex")
    val sexIndexer = new StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
    val nativeCountryIndexer = new StringIndexer().setInputCol("native-country").setOutputCol("nativeCountryIndex")
    val incomeIndexer = new StringIndexer().setInputCol("income").setOutputCol("incomeIndex")

    // Transform string type columns to string indexer 
    val workclassEncoder = new OneHotEncoder().setInputCol("workclassIndex").setOutputCol("workclassVec")
    val educationEncoder = new OneHotEncoder().setInputCol("educationIndex").setOutputCol("educationVec")
    val maritalStatusEncoder = new OneHotEncoder().setInputCol("maritalStatusIndex").setOutputCol("maritalVec")
    val occupationEncoder = new OneHotEncoder().setInputCol("occupationIndex").setOutputCol("occupationVec")
    val relationshipEncoder = new OneHotEncoder().setInputCol("relationshipIndex").setOutputCol("relationshipVec")
    val raceEncoder = new OneHotEncoder().setInputCol("raceIndex").setOutputCol("raceVec")
    val sexEncoder = new OneHotEncoder().setInputCol("sexIndex").setOutputCol("sexVec")
    val nativeCountryEncoder = new OneHotEncoder().setInputCol("nativeCountryIndex").setOutputCol("nativeCountryVec")
    val incomeEncoder = new StringIndexer().setInputCol("incomeIndex").setOutputCol("label")

    // Assemble everything together to be ("label","features") format
  /*  val assembler = (new VectorAssembler()
      .setInputCols(Array("workclassVec", "fnlwgt", "educationVec", "education-num", "maritalVec", "occupationVec", "relationshipVec", "raceVec", "sexVec", "capital-gain", "capital-loss", "hours-per-week", "nativeCountryVec"))
      .setOutputCol("features"))*/
  val assembler = (new VectorAssembler()
      .setInputCols(Array("workclass", "education", "marital-status", "occupation", "relationship", "race", "sex", "native-country", "income"))
      .setOutputCol("features"))
    ////////////////////////////
    /// Split the Data ////////
    //////////////////////////
    val Array(training, test) = logregdata.randomSplit(Array(0.7, 0.3), seed = 12345)

    ///////////////////////////////
    // Set Up the Pipeline ///////
    /////////////////////////////
    import org.apache.spark.ml.Pipeline

    val lr = new RandomForestClassifier().setNumTrees(10)

    //val pipeline = new Pipeline().setStages(Array(workclassIndexer, educationIndexer, maritalStatusIndexer, occupationIndexer, relationshipIndexer, raceIndexer, sexIndexer, nativeCountryIndexer, incomeIndexer, workclassEncoder, educationEncoder, maritalStatusEncoder, occupationEncoder, relationshipEncoder, raceEncoder, sexEncoder, nativeCountryEncoder, incomeEncoder, assembler, lr))
    val pipeline = new Pipeline().setStages(Array(assembler, lr))

    // Fit the pipeline to training documents.
    val model = pipeline.fit(training)
    // Get Results on Test Set
    val results = model.transform(test)

    ////////////////////////////////////
    //// MODEL EVALUATION /////////////
    //////////////////////////////////
    println("schema")
    println(results.select($"label").distinct().foreach { x => println(x) })

    // For Metrics and Evaluation
    import org.apache.spark.mllib.evaluation.MulticlassMetrics

    // Need to convert to RDD to use this
    val predictionAndLabels = results.select($"prediction", $"label").as[(Double, Double)].rdd

    // Instantiate metrics object
    val metrics = new MulticlassMetrics(predictionAndLabels)

    // Confusion matrix
    println("Confusion matrix:")
    println(metrics.confusionMatrix)
    println(metrics.accuracy)
  }
}
Nikhil Bhide
  • 728
  • 8
  • 23