57

I have a PySpark dataframe

+-------+--------------+----+----+
|address|          date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin|gre |
|1111111|20151122045501| Yin|gre |
|1111111|20151122045500| Yln|gra |
|1111112|20151122065832| Yun|ddd |
|1111113|20160101003221| Yan|fdf |
|1111111|20160703045231| Yin|gre |
|1111114|20150419134543| Yin|fdf |
|1111115|20151123174302| Yen|ddd |
|2111115|      20123192| Yen|gre |
+-------+--------------+----+----+

that I want to transform to use with pyspark.ml. I can use a StringIndexer to convert the name column to a numeric category:

indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()
+-------+--------------+----+----------+----+
|address|          date|name|name_index|food|
+-------+--------------+----+----------+----+
|1111111|20151122045510| Yin|       0.0|gre |
|1111111|20151122045501| Yin|       0.0|gre |
|1111111|20151122045500| Yln|       2.0|gra |
|1111112|20151122065832| Yun|       4.0|ddd |
|1111113|20160101003221| Yan|       3.0|fdf |
|1111111|20160703045231| Yin|       0.0|gre |
|1111114|20150419134543| Yin|       0.0|fdf |
|1111115|20151123174302| Yen|       1.0|ddd |
|2111115|      20123192| Yen|       1.0|gre |
+-------+--------------+----+----------+----+

How can I transform several columns with StringIndexer (for example, name and food, each with its own StringIndexer) and then use VectorAssembler to generate a feature vector? Or do I have to create a StringIndexer for each column?

** EDIT **: This is not a dupe because I need to to this programatically for several data frames with different column names. I can't use VectorIndexer or VectorAssembler because the columns are not numerical.

** EDIT 2**: A tentative solution is

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df).transform(df) for column in df.columns ]

where I create a list now with three dataframes, each identical to the original plus the transformed column. Now I need to join then to form the final dataframe, but that's very inefficient.

Community
  • 1
  • 1
Ivan
  • 19,560
  • 31
  • 97
  • 141
  • 1
    Possible duplicate of [Encode and assemble multiple features in PySpark](http://stackoverflow.com/questions/32982425/encode-and-assemble-multiple-features-in-pyspark) – zero323 Apr 29 '16 at 16:02
  • 1
    Similar but not really. He's doing one column at a time for each string indexer and I need to do to several columns at the same time, without doing each separated – Ivan Apr 29 '16 at 16:14
  • Then it is not possible. What would be even the output? – zero323 Apr 29 '16 at 16:17
  • OK, that's what I was looking for. I will code an UDF then. – Ivan Apr 29 '16 at 16:39
  • Don't you want some more like `CountVectroizer`? – zero323 Apr 29 '16 at 16:42
  • Yeah, would be similar to sklearn `CountVectorizer`, but for several columns, no a list in a column. – Ivan Apr 29 '16 at 17:06
  • You could gather columns with `array("some_col", "other_col")` and use `CountVectorizer` but I don't think it really make that much sense. – zero323 Apr 30 '16 at 09:41

4 Answers4

100

The best way that I've found to do it is to combine several StringIndex on a list and use a Pipeline to execute them all:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['date'])) ]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()
+-------+--------------+----+----+----------+----------+-------------+
|address|          date|food|name|food_index|name_index|address_index|
+-------+--------------+----+----+----------+----------+-------------+
|1111111|20151122045510| gre| Yin|       0.0|       0.0|          0.0|
|1111111|20151122045501| gra| Yin|       2.0|       0.0|          0.0|
|1111111|20151122045500| gre| Yln|       0.0|       2.0|          0.0|
|1111112|20151122065832| gre| Yun|       0.0|       4.0|          3.0|
|1111113|20160101003221| gre| Yan|       0.0|       3.0|          1.0|
|1111111|20160703045231| gre| Yin|       0.0|       0.0|          0.0|
|1111114|20150419134543| gre| Yin|       0.0|       0.0|          5.0|
|1111115|20151123174302| ddd| Yen|       1.0|       1.0|          2.0|
|2111115|      20123192| ddd| Yen|       1.0|       1.0|          4.0|
+-------+--------------+----+----+----------+----------+-------------+
Ivan
  • 19,560
  • 31
  • 97
  • 141
  • 1
    I have an iteration loop with reassignments inside. Ugliest code in the universe.. Guess it's time to invest a little effort and switch to `Pipelines` once and for all! – avloss Dec 03 '16 at 20:45
  • 28
    Do you really need to `fit` in `indexers`? You are running `fit` in the `pipeline` anyway. – David Arenburg Jan 30 '17 at 13:05
  • 1
    I tried this solution, but I found this is no difference with for-loop. I still had multiple spark-job as for-loop – cinqS Dec 17 '18 at 09:01
5

With PySpark 3.0+ this is now easier and you can use the inputCols and outputCols options: https://spark.apache.org/docs/latest/ml-features#stringindexer

class pyspark.ml.feature.StringIndexer(
    inputCol=..., 
    outputCol=..., 
    inputCols=..., 
    outputCols=..., 
    handleInvalid='error', 
    stringOrderType='frequencyDesc'
)
Nic Scozzaro
  • 6,651
  • 3
  • 42
  • 46
4

I can offer you the following solution. It is better to use pipelines for these kind of transformations on larger data sets. They also make your code a lot easier to follow and understand. You can add more stages to the pipelines if you need. For example add an encoder.

#create a list of the columns that are string typed
categoricalColumns = [item[0] for item in df.dtypes if item[1].startswith('string') ]

#define a list of stages in your pipeline. The string indexer will be one stage
stages = []

#iterate through all categorical values
for categoricalCol in categoricalColumns:
    #create a string indexer for those categorical values and assign a new name including the word 'Index'
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')

    #append the string Indexer to our list of stages
    stages += [stringIndexer]

#Create the pipeline. Assign the satges list to the pipeline key word stages
pipeline = Pipeline(stages = stages)
#fit the pipeline to our dataframe
pipelineModel = pipeline.fit(df)
#transform the dataframe
df= pipelineModel.transform(df)

Please have a look at my reference

DataBach
  • 1,330
  • 2
  • 16
  • 31
1

To Apply StringIndexer to several columns in a PySpark Dataframe for spark 2.4.7

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [StringIndexer(inputCol="F1", outputCol="F1Index") , StringIndexer(inputCol="F5", outputCol="F5Index")]


pipeline = Pipeline(stages=indexers)
DF6 = pipeline.fit(DF5).transform(DF5)

DF6.show()