I am new to PySpark implementation of ML algorithms. I am trying to create a classification model can be any of Logistic, Random Forest, Multiclass classifier etc. I have written a code with RandomForestClassifier for now.
Problem: Data preprocessing steps StandardScaler runs when ran line by line but fails when trying to run in a function call. Sample code and error description:
def preprocessData(data='', all_cols='', categoricalcols=''):
#Preprocessing the customer data
print("Preparing the data for model fitting.")
#Assembling the Dataframe
print("Assembling the data.")
assembler = VectorAssembler(inputCols=all_cols, outputCol="assmbldFeatures")
#Standardize the data
print("Standardizing and Scaling the data.")
stdScaler = StandardScaler(inputCol=assembler.getOutputCol(),outputCol="stdScldFeatures",withStd=True, withMean=False)
#Min Max scaler
print("Performing min max scaling.")
minMaxScaler = MinMaxScaler(inputCol=stdScaler.getOutputCol(), outputCol="minMaxScldFeatures")
pipeline = Pipeline(stages=[assembler,stdScaler,minMaxScaler])
prcsdData= pipeline.fit(data).transform(data)
return prcsdData
Even when using pipeline and sequencing the steps it fails. Kindly advise. I am basically trying to modular the code.
error:
Performing min max scaling.
[Stage 0:==============================================> (13 + 2) / 16]18/05/02 05:13:42 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 16, lx955.mutualofomaha.com, executor 2): java.lang.IllegalArgumentException: requirement failed: Vector should have dimension larger than zero.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:74)
at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:67)
at org.apache.spark.mllib.feature.StandardScaler$$anonfun$2.apply(StandardScaler.scala:58)
at org.apache.spark.mllib.feature.StandardScaler$$anonfun$2.apply(StandardScaler.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
<-- Sample Data -->
|incm|incm_2|med|age
|56 | 181 |18 |47
|46 | 198 |31 |54
|91 | 742 |54 |45
|54 | 187 |55 |57
|58 | 123 |61 |48
|50 | 196 |49 |44
|32 | 145 |39 |55
|30 | 101 |29 |55
|62 | 177 |44 |53
|53 | 140 |30 |57