18

I want to apply MinMaxScalar of PySpark to multiple columns of PySpark data frame df. So far, I only know how to apply it to a single column, e.g. x.

from pyspark.ml.feature import MinMaxScaler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)

What if I have 100 columns? Is there any way to do min-max scaling for many columns in PySpark?

Update:

Also, how to apply MinMaxScalar on integer or double values? It throws the following error:

java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.
Mohamed Ali JAMAOUI
  • 14,275
  • 14
  • 73
  • 117
Fluxy
  • 2,838
  • 6
  • 34
  • 63

2 Answers2

22

Question 1:

How to change your example to run properly. You need to prepare the data as a vector for the transformers to work.

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorAssembler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

assembler = VectorAssembler(inputCols=["x"], outputCol="x_vec")
scaler = MinMaxScaler(inputCol="x_vec", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)

Question 2:

To run MinMaxScaler on multiple columns you can use a pipeline that receives a list of transformation prepared with with a list comprehension:

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)

Check this example pipeline in the official documentation.

Eventually, you will end with the results in this format:

>>> scaledData.printSchema() 
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)
 |-- z: long (nullable = true)
 |-- x_vec: vector (nullable = true)
 |-- y_vec: vector (nullable = true)
 |-- z_vec: vector (nullable = true)
 |-- x_scaled: vector (nullable = true)
 |-- y_scaled: vector (nullable = true)
 |-- z_scaled: vector (nullable = true)

>>> scaledData.show()
+---+---+----+-----+-----+--------+--------+--------+--------------------+
|  x|  y|   z|x_vec|y_vec|   z_vec|x_scaled|y_scaled|            z_scaled|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
|  0|  1| 100|[0.0]|[1.0]| [100.0]|   [0.0]|   [0.0]|               [0.0]|
|  1|  2| 200|[1.0]|[2.0]| [200.0]|   [0.5]|  [0.25]|[0.1111111111111111]|
|  2|  5|1000|[2.0]|[5.0]|[1000.0]|   [1.0]|   [1.0]|               [1.0]|
+---+---+----+-----+-----+--------+--------+--------+--------------------+

Extra Post-processing:

You can recover the columns in their original names with some post-processing. For example:

from pyspark.sql import functions as f
names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])

The output will be:

scaledData.show()
+------+-----+--------------------+
|     y|    x|                   z|
+------+-----+--------------------+
| [0.0]|[0.0]|               [0.0]|
|[0.25]|[0.5]|[0.1111111111111111]|
| [1.0]|[1.0]|               [1.0]|
+------+-----+--------------------+
Mohamed Ali JAMAOUI
  • 14,275
  • 14
  • 73
  • 117
  • Thanks! I tested your solution. I get an error: `java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.` Coud you please advise me how to fix it? – Fluxy Feb 18 '20 at 13:31
  • I executed `printSchema()`. All columns are numeric: integer or double, e.g. `col1: integer (nullable = true)` – Fluxy Feb 18 '20 at 13:33
  • It looks that it is necessary to encode data into `Vectors.dense`. Could you please show a complete example with `Vectors.dense`? – Fluxy Feb 18 '20 at 13:34
  • How can I get values out of `[ ]` in the final result? – Fluxy Feb 18 '20 at 19:03
  • 1
    @Fluxy you can use a udf like in this answer https://stackoverflow.com/a/44505571/1762211 – Mohamed Ali JAMAOUI Feb 18 '20 at 19:58
  • Thanks. I tested it and it worked well. Can you please show how to apply this `udf` to all columns of PySpark DataFrame? – Fluxy Feb 18 '20 at 20:25
  • you can post your answer here (my new question): https://stackoverflow.com/questions/60287632/how-to-convert-single-value-vectors-into-numeric-values-in-dataframe-cells and I will accept it. – Fluxy Feb 18 '20 at 20:28
  • @Fluxy I answered in the your new question. Also, please don't change your question to answer different question, that makes the whole discussion unuseable for other people who have the same original question. – Mohamed Ali JAMAOUI Feb 19 '20 at 09:37
  • VectorAssembler is not in the from pyspark.ml.linalg but pyspark.ml.feature I guess; https://spark.apache.org/docs/2.4.4/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler – Yosher Feb 18 '21 at 02:25
  • https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark This can prove helpful – Anmol Deep Jul 15 '22 at 06:07
7

You could use a single MinMaxScaler instance for a "vector-assembled" set of features, rather than creating one MinMaxScaler per column you want to transform(scale in this case).

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

#1. Your original dataset
#pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
#df = spark.createDataFrame(pdf)

df = spark.createDataFrame([(0, 10.0, 0.1), (1, 1.0, 0.20), (2, 1.0, 0.9)],["x", "y", "z"])

df.show()
+---+----+---+
|  x|   y|  z|
+---+----+---+
|  0|10.0|0.1|
|  1| 1.0|0.2|
|  2| 1.0|0.9|
+---+----+---+

#2. Vector assembled set of features 
# (assemble only the columns you want to MinMax Scale)
assembler = VectorAssembler(inputCols=["x", "y", "z"], 
outputCol="features")
output = assembler.transform(df)

output.show()

+---+----+---+--------------+
|  x|   y|  z|      features|
+---+----+---+--------------+
|  0|10.0|0.1|[0.0,10.0,0.1]|
|  1| 1.0|0.2| [1.0,1.0,0.2]|
|  2| 1.0|0.9| [2.0,1.0,0.9]|
+---+----+---+--------------+

#3. Applying MinMaxScaler to your assembled features 
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# rescale each feature to range [min, max].
scaledData = scaler.fit(output).transform(output)
scaledData.show()

+---+----+---+--------------+---------------+
|  x|   y|  z|      features| scaledFeatures|
+---+----+---+--------------+---------------+
|  0|10.0|0.1|[0.0,10.0,0.1]|  [0.0,1.0,0.0]|
|  1| 1.0|0.2| [1.0,1.0,0.2]|[0.5,0.0,0.125]|
|  2| 1.0|0.9| [2.0,1.0,0.9]|  [1.0,0.0,1.0]|
+---+----+---+--------------+---------------+

Hope this helps.

conflicted_user
  • 329
  • 3
  • 4