49

How do I handle categorical data with spark-ml and not spark-mllib ?

Thought the documentation is not very clear, it seems that classifiers e.g. RandomForestClassifier, LogisticRegression, have a featuresCol argument, which specifies the name of the column of features in the DataFrame, and a labelCol argument, which specifies the name of the column of labeled classes in the DataFrame.

Obviously I want to use more than one feature in my prediction, so I tried using the VectorAssembler to put all my features in a single vector under featuresCol.

However, the VectorAssembler only accepts numeric types, boolean type, and vector type (according to the Spark website), so I can't put strings in my features vector.

How should I proceed?

eliasah
  • 39,588
  • 11
  • 124
  • 154
Rainmaker
  • 1,181
  • 2
  • 11
  • 11

5 Answers5

59

I just wanted to complete Holden's answer.

Since Spark 2.3.0,OneHotEncoder has been deprecated and it will be removed in 3.0.0. Please use OneHotEncoderEstimator instead.

In Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer}

val df = Seq((0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)).toDF("id", "category1", "category2")

val indexer = new StringIndexer().setInputCol("category1").setOutputCol("category1Index")
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol, "category2"))
  .setOutputCols(Array("category1Vec", "category2Vec"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

pipeline.fit(df).transform(df).show
// +---+---------+---------+--------------+-------------+-------------+
// | id|category1|category2|category1Index| category1Vec| category2Vec|
// +---+---------+---------+--------------+-------------+-------------+
// |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
// |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
// |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// +---+---------+---------+--------------+-------------+-------------+

In Python:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

df = spark.createDataFrame([(0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)], ["id", "category1", "category2"])

indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
inputs = [indexer.getOutputCol(), "category2"]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["categoryVec1", "categoryVec2"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline.fit(df).transform(df).show()
# +---+---------+---------+--------------+-------------+-------------+
# | id|category1|category2|category1Index| categoryVec1| categoryVec2|
# +---+---------+---------+--------------+-------------+-------------+
# |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
# |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
# |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# +---+---------+---------+--------------+-------------+-------------+

Since Spark 1.4.0, MLLib also supplies OneHotEncoder feature, which maps a column of label indices to a column of binary vectors, with at most a single one-value.

This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features

Let's consider the following DataFrame:

val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"))
            .toDF("id", "category")

The first step would be to create the indexed DataFrame with the StringIndexer:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

indexed.show
// +---+--------+-------------+                                                    
// | id|category|categoryIndex|
// +---+--------+-------------+
// |  0|       a|          0.0|
// |  1|       b|          2.0|
// |  2|       c|          1.0|
// |  3|       a|          0.0|
// |  4|       a|          0.0|
// |  5|       c|          1.0|
// +---+--------+-------------+

You can then encode the categoryIndex with OneHotEncoder :

import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
                   .setInputCol("categoryIndex")
                   .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show
// +---+-------------+
// | id|  categoryVec|
// +---+-------------+
// |  0|(2,[0],[1.0])|
// |  1|    (2,[],[])|
// |  2|(2,[1],[1.0])|
// |  3|(2,[0],[1.0])|
// |  4|(2,[0],[1.0])|
// |  5|(2,[1],[1.0])|
// +---+-------------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
eliasah
  • 39,588
  • 11
  • 124
  • 154
  • 9
    Thanks, but I have 2 concerns: 1) Suppose I want to use decision trees, random forests, or anything else that can naturally handle categorical variables without binarizing them. What do I do in that case? 2) If I'm not wrong, StringIndexer assigns indices based on the frequency of each term. Does this mean that the training and testing sets will have different labels, making predictions meaningless? – Rainmaker Aug 28 '15 at 21:28
  • 3
    You have other kind of indexers. Try to look for what you need in the official documentation concerning feature extraction with MLlib! You can find, per example, VectorIndexer – eliasah Aug 28 '15 at 21:29
  • 8
    Ok it seems that VectorIndexer is what I was looking for. I wanted a RandomForestClassifier to treat categorical and continuous variables differently without explicitly creating binary vectors from the categorical variables. Also it seems that my second concern was just wrong. StringIndexer assigns indices based on the frequency of each term in the training set. When the StringIndexerModel is used to transform the testing set, it retains the same index mappings from the training set, regardless of the frequency of terms in the testing set. Thanks for the help! – Rainmaker Aug 28 '15 at 23:58
  • if you are familiar with R it behaves like as.factor but a string is just given a numeric correspond to the string. – eliasah Jul 26 '16 at 12:13
  • The solution works great...but is there a way to bucket less frequent categories in `OTHER`, to keep number of features in manageable size? – anwartheravian Dec 05 '16 at 21:01
  • @Rainmaker: I had a follow up question, imagine a model is trained and is used in production, every new data needs to be hot encoding and then given to the model, will the index to category remain same? if a = 1 in model, if a = 1 for new data when the model is loaded in spark – Anubhav Dikshit Apr 25 '17 at 06:35
  • @user3875610 I don't believe that Rainmaker will answer you. He hasn't been active since almost a year now. – eliasah Apr 25 '17 at 06:43
  • @eliasah can you please explain the structure of categoryVec column – Amir Choubani May 28 '18 at 08:08
  • @AmirChoubani I’ll refer you to my answer here https://stackoverflow.com/a/40506131/3415409 – eliasah May 28 '18 at 08:28
  • @eliasah so, it is a sparse representation. But I think that `a` should have ( 2, [0], [0.0] ).Doesn't ?? – Amir Choubani May 28 '18 at 08:37
  • 1
    @AmirChoubani no, zero elements are removed. Ref. https://en.m.wikipedia.org/wiki/Sparse_matrix – eliasah May 28 '18 at 08:41
  • @DavidArenburg Could you please explain the difference between StringIndexer and HotEncoder – Jon Andrews Mar 19 '19 at 09:34
  • @BasilPaul StringIndexer creates just an index that maps to the string whereas the OHE creates a sparse vector in which one dimension maps to the string – eliasah Mar 19 '19 at 09:37
  • @eliasah Thankyou for the valuable information. I have a dataset which Iam trying the excute using StringIndexer and OHE. I got stuck at a point. Could you help me out with that. Iam an beginner. Your help would reallly be appreaciated – Jon Andrews Mar 19 '19 at 09:40
  • @BasilPaul can you try to post your question so I can see how I can help ? – eliasah Mar 19 '19 at 09:47
  • Sure. https://stackoverflow.com/questions/55238409/problem-understanding-working-of-stringindexer-onehotencoder-and-assembler-pyspa – Jon Andrews Mar 19 '19 at 10:11
  • @eliasah . I have posted the question. Kindly check – Jon Andrews Mar 19 '19 at 10:12
  • @eliasah In your example you should mention that the OneHotEncoderEstimator assumes that the categories start from 0. Hence the first category vector size is 2 and the second is 4. Also it worth mentioning that both are constructed with the default option `dropLast = true` - not an obvious assumption in general... – Oleg Jan 07 '20 at 10:23
  • With a column of strings that are pre-formatted, lets' say we have 5 different strings in the column. Do you have to run `StringIndexer()` as well as `OneHotEncoderEstimator()`? – Chuck Mar 15 '20 at 21:40
50

I am going to provide an answer from another perspective, since I was also wondering about categorical features with regards to tree-based models in Spark ML (not MLlib), and the documentation is not that clear how everything works.

When you transform a column in your dataframe using pyspark.ml.feature.StringIndexer extra meta-data gets stored in the dataframe that specifically marks the transformed feature as a categorical feature.

When you print the dataframe you will see a numeric value (which is an index that corresponds with one of your categorical values) and if you look at the schema you will see that your new transformed column is of type double. However, this new column you created with pyspark.ml.feature.StringIndexer.transform is not just a normal double column, it has extra meta-data associated with it that is very important. You can inspect this meta-data by looking at the metadata property of the appropriate field in your dataframe's schema (you can access the schema objects of your dataframe by looking at yourdataframe.schema)

This extra metadata has two important implications:

  1. When you call .fit() when using a tree based model, it will scan the meta-data of your dataframe and recognize fields that you encoded as categorical with transformers such as pyspark.ml.feature.StringIndexer (as noted above there are other transformers that will also have this effect such as pyspark.ml.feature.VectorIndexer). Because of this, you DO NOT have to one-hot encode your features after you have transformed them with StringIndxer when using tree-based models in spark ML (however, you still have to perform one-hot encoding when using other models that do not naturally handle categoricals like linear regression, etc.).

  2. Because this metadata is stored in the data frame, you can use pyspark.ml.feature.IndexToString to reverse the numeric indices back to the original categorical values (which are often strings) at any time.

hamel
  • 1,288
  • 1
  • 12
  • 16
  • 1
    Could you please point me to source code where it scans metadata of dataframe for any tree based algorithm? Also would it make sense to use rformula + tree based alsgorithm in pipeline?? Rformula internally uses stringIndexer + one hot encoder + vector assembler. – m-bhole Jun 15 '17 at 08:00
  • 1
    @hadooper: https://github.com/apache/spark/blob/v2.2.0/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala#L120 – hamel Aug 11 '17 at 06:59
  • 3
    But if GBTClassifier expects the dataframe to have just two columns: "label" and "features", and the "features" column should be of type Vector with its values of type double, as I understand, how can the metadata created by StringIndexer be passed into GBTClassifier? – Dmitri Lihhatsov Mar 25 '18 at 22:25
  • With a column of strings. Do you have to run `StringIndexer()` as well as `OneHotEncoderEstimator()`? – Chuck Mar 15 '20 at 21:40
7

There is a component of the ML pipeline called StringIndexer you can use to convert your strings to Double's in a reasonable way. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer has more documentation, and http://spark.apache.org/docs/latest/ml-guide.html shows how to construct pipelines.

Holden
  • 7,392
  • 1
  • 27
  • 33
  • With a column of strings. Do you have to run `StringIndexer()` as well as `OneHotEncoderEstimator()`? Or can you just run the latter? – Chuck Mar 15 '20 at 21:41
0

I use the following method for oneHotEncoding a single column in a Spark dataFrame:

def ohcOneColumn(df, colName, debug=False):

  colsToFillNa = []

  if debug: print("Entering method ohcOneColumn")
  countUnique = df.groupBy(colName).count().count()
  if debug: print(countUnique)

  collectOnce = df.select(colName).distinct().collect()
  for uniqueValIndex in range(countUnique):
    uniqueVal = collectOnce[uniqueValIndex][0]
    if debug: print(uniqueVal)
    newColName = str(colName) + '_' + str(uniqueVal) + '_TF'
    df = df.withColumn(newColName, df[colName]==uniqueVal)
    colsToFillNa.append(newColName)
  df = df.drop(colName)
  df = df.na.fill(False, subset=colsToFillNa)
  return df

I use the following method for oneHotEncoding Spark dataFrames:

from pyspark.sql.functions import col, countDistinct, approxCountDistinct
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

def detectAndLabelCat(sparkDf, minValCount=5, debug=False, excludeCols=['Target']):
  if debug: print("Entering method detectAndLabelCat")
  newDf = sparkDf
  colList = sparkDf.columns

  for colName in sparkDf.columns:
    uniqueVals = sparkDf.groupBy(colName).count()
    if debug: print(uniqueVals)
    countUnique = uniqueVals.count()
    dtype = str(sparkDf.schema[colName].dataType)
    #dtype = str(df.schema[nc].dataType)
    if (colName in excludeCols):
      if debug: print(str(colName) + ' is in the excluded columns list.')

    elif countUnique == 1:
      newDf = newDf.drop(colName)
      if debug:
        print('dropping column ' + str(colName) + ' because it only contains one unique value.')
      #end if debug
    #elif (1==2):
    elif ((countUnique < minValCount) | (dtype=="String") | (dtype=="StringType")):
      if debug: 
        print(len(newDf.columns))
        oldColumns = newDf.columns
      newDf = ohcOneColumn(newDf, colName, debug=debug)
      if debug: 
        print(len(newDf.columns))
        newColumns = set(newDf.columns) - set(oldColumns)
        print('Adding:')
        print(newColumns)
        for newColumn in newColumns:
          if newColumn in newDf.columns:
            try:
              newUniqueValCount = newDf.groupBy(newColumn).count().count()
              print("There are " + str(newUniqueValCount) + " unique values in " + str(newColumn))
            except:
              print('Uncaught error discussing ' + str(newColumn))
          #else:
          #  newColumns.remove(newColumn)

        print('Dropping:')
        print(set(oldColumns) - set(newDf.columns))

    else:
      if debug: print('Nothing done for column ' + str(colName))

      #end if countUnique == 1, elif countUnique other condition
    #end outer for
  return newDf
Jim
  • 1
  • 2
  • #To test the above methods, I use the following: tdf = spark.createDataFrame([ ('horse', 'orange'), ('cow', 'apple'), ('pig', 'orange'), ('horse', 'pineapple'), ('horse', 'orange'), ('pig', 'apple') ], ["animalType", "fruitType"]) tdf.show() newDf = ohcOneColumn(tdf, "animalType", debug=False) newDf.show() newerDf = detectAndLabelCat(tdf, debug=False) newerDf.show() – Jim Jul 17 '19 at 18:03
-2

You can cast a string column type in a spark data frame to a numerical data type using the cast function.

from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType, IntegerType

sqlContext = SQLContext(sc)
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')   

dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))

In the above example, we read in a csv file as a data frame, cast the default string datatypes into integer and double, and overwrite the original data frame. We can then use the VectorAssembler to merge the features in a single vector and apply your favorite Spark ML algorithm.

Shaido
  • 27,497
  • 23
  • 70
  • 73
Vadim Smolyakov
  • 1,187
  • 11
  • 24