MRE:
df = pd.DataFrame({
"age":[39, 50, 38, 53, 28, 37, 49, 52, 33, 22],
"education": [' Bachelors', ' Bachelors', ' HS-grad', ' 11th', ' Bachelors', ' Masters',
' 9th',' HS-grad',' Masters',' Bachelors'],
"workclass":[' State-gov', ' Self-emp-not-inc', ' Private', ' Private', ' Private',
' Private', ' Private', ' Self-emp-not-inc', ' Private', ' Self-emp-not-inc']
})
df = spark.createDataFrame(df)
num_cols = ["age"]
cat_cols = ["education"]
asmbler = [VectorAssembler(inputCols=[col], outputCol=col+"_vec", handleInvalid='skip')
for col in num_cols]
mm_scaler = [MinMaxScaler(inputCol=col+"_vec", outputCol=col+"_mm_scaled")
for col in num_cols]
str_indexer = [StringIndexer(inputCol=col, outputCol=col+"_si", handleInvalid="skip") for col in cat_cols]
ohe = [OneHotEncoder(inputCol=f"{col}_si", outputCol=f"{col}_ohe", dropLast=True) for col in cat_cols]
stages = asmbler + mm_scaler + str_indexer + ohe
pp_pl = Pipeline(stages=stages).fit(df)
pp_pl.show() looks like this:
+---+----------+-----------------+-------+--------------------+------------+-------------+
|age| education| workclass|age_vec| age_mm_scaled|education_si|education_ohe|
+---+----------+-----------------+-------+--------------------+------------+-------------+
| 39| Bachelors| State-gov| [39.0]|[0.5483870967741935]| 0.0|(4,[0],[1.0])|
| 50| Bachelors| Self-emp-not-inc| [50.0]|[0.9032258064516129]| 0.0|(4,[0],[1.0])|
| 38| HS-grad| Private| [38.0]|[0.5161290322580645]| 1.0|(4,[1],[1.0])|
| 53| 11th| Private| [53.0]| [1.0]| 3.0|(4,[3],[1.0])|
| 28| Bachelors| Private| [28.0]|[0.1935483870967742]| 0.0|(4,[0],[1.0])|
| 37| Masters| Private| [37.0]|[0.4838709677419355]| 2.0|(4,[2],[1.0])|
| 49| 9th| Private| [49.0]|[0.8709677419354839]| 4.0| (4,[],[])|
| 52| HS-grad| Self-emp-not-inc| [52.0]| [0.967741935483871]| 1.0|(4,[1],[1.0])|
| 33| Masters| Private| [33.0]|[0.3548387096774194]| 2.0|(4,[2],[1.0])|
| 22| Bachelors| Self-emp-not-inc| [22.0]| [0.0]| 0.0|(4,[0],[1.0])|
+---+----------+-----------------+-------+--------------------+------------+-------------+
printSchema()
>>> root
|-- age: long (nullable = true)
|-- education: string (nullable = true)
|-- workclass: string (nullable = true)
|-- age_vec: vector (nullable = true)
|-- age_mm_scaled: vector (nullable = true)
|-- education_si: double (nullable = false)
|-- education_ohe: vector (nullable = true)
I am trying to split vector into multiple columns just like in pd.dummies. I've been following How to split Vector into columns - using PySpark It did work before however suddenly it doesn't
def extract(row):
return (row.education, ) + tuple(row.education_ohe.toArray())
temp = X_cat.select("education", "education_ohe")
test = temp.rdd.map(extract).toDF(["education", "education_ohe"])
This outputs error in the title. After some searching on SO Pyspark: PicklingError: Could not serialize object:, it is said that this error occur when trying to serialize dataframe object however I am just converting mapped rdd into DataFrame... Also I've tried .collect()
instead of .toDF
and still the same error so I am guessing error is when mapping rdd element.
There are no Null values and function works fine when I do it on one element
rows = temp.rdd.collect()
extract(rows[0])
Eventually I will expand it for all one-hot encoded columns.
Desired output:
age Bachelors Hs-Grad 11th ... State-gov Self-emp-not-inc ...
39 1 0 0 1 0
50 1 0 0 0 1
38 0 1 0 ... 0 0 ...