I tried excluding all null values and make my label starts from '0' instead of '1'. Both don't solve the error. The error says failed to execute OneHotEncoder, but I am not using it in my code.. Followings are my code and the error I got.
This is my DataFrame. I dropped all null values.
low_ask_sizes:integer
ask_price:integer
collaboration:integer
released_months:integer
brand:string
cate_color:string
label:integer
Split the data into train and test
train, validation, test = data.randomSplit([0.7, 0.2, 0.1], 1234)
Convert nominal column to numerical column
from pyspark.ml.feature import StringIndexer
categorical_columns = [item[0] for item in data.dtypes if item[1].startswith('string')]
numeric_columns = [item[0] for item in data.dtypes if item[1].startswith('int')]
indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(
column), handleInvalid = 'keep') for column in categorical_columns]
Assemble feature columns into one feature vector column
from pyspark.ml.feature import VectorAssembler
featuresCreator = VectorAssembler(
inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns,
outputCol='features')
layers = [len(featuresCreator.getInputCols()), 4, 2, 2]
Select features (vector column) and label column
prepared_data = output.select('features', 'label')
This is the structure of prepared_data:
features:udt
label:integer
Classifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
classifier = MultilayerPerceptronClassifier(labelCol='label',
featuresCol='features',
maxIter=100,
layers=layers,
blockSize=128,
seed=1234)
Build the pipeline and start training the model
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])
model = pipeline.fit(train)
It gives me this error on the last line:
Py4JJavaError Traceback (most recent call last)
<command-2394554338211359> in <module>
2
3 pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])
----> 4 model = pipeline.fit(train)
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
112 dataset = stage.transform(dataset)
113 else: # must be an Estimator
--> 114 model = stage.fit(dataset)
115 transformers.append(model)
116 if i < indexOfLastEstimator:
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
333
334 def _fit(self, dataset):
--> 335 java_model = self._fit_java(dataset)
336 model = self._create_model(java_model)
337 return self._copyValues(model)
/databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
330 """
331 self._transfer_params_to_java()
--> 332 return self._java_obj.fit(dataset._jdf)
333
334 def _fit(self, dataset):
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
108 def deco(*a, **kw):
109 try:
--> 110 return f(*a, **kw)
111 except py4j.protocol.Py4JJavaError as e:
112 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o3766.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 773.0 failed 1 times, most recent failure: Lost task 0.0 in stage 773.0 (TID 1010) (ip-10-172-226-106.us-west-2.compute.internal executor driver): org.apache.spark.SparkException: Failed to execute user defined function(OneHotEncoderModel$$Lambda$6652/897874937: (double, int) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)