3

I'm trying to perform a linear regression on a data frame with two columns on a single categorical variable, predicting performance as a function of device_class:

df.select('performance','device_class').show(5,False)

+----------------+------------+
|performance     |device_class|
+----------------+------------+
|35              |2           |
|35              |2           |
|35              |2           |
|25              |2           |
|5               |1           |
+----------------+------------+
only showing top 5 rows

df.select('performance','device_class').printSchema()

root
 |-- performance: integer (nullable = true)
 |-- device_class: integer (nullable = true)

.

from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import *

numEncoder = OneHotEncoder(dropLast=False,inputCol="device_class",outputCol="dev_class_cat")

fAssembler = VectorAssembler(
    inputCols=['dev_class_cat'],
    outputCol='features')

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,labelCol='performance',featuresCol='features')

pipeline = Pipeline(stages=[numEncoder,fAssembler])

modelTmp = pipeline.fit(df)

modelTmp = pipeline.fit(df)
tmp = modelTmp.transform(df).select('performance','dev_class_cat','features').show(5,False)

+----------------+-------------+-------------+
|performance     |dev_class_cat|features     |
+----------------+-------------+-------------+
|35              |(5,[2],[1.0])|(5,[2],[1.0])|
|35              |(5,[2],[1.0])|(5,[2],[1.0])|
|35              |(5,[2],[1.0])|(5,[2],[1.0])|
|25              |(5,[2],[1.0])|(5,[2],[1.0])|
|5               |(5,[1],[1.0])|(5,[1],[1.0])|
+----------------+-------------+-------------+
only showing top 5 rows

Works fine, so far so good. But if I add the regression into the pipeline:

pipeline = Pipeline(stages=[numEncoder,fAssembler,lr])
modelTmp = pipeline.fit(df)

And I get this error:

Py4JJavaError: An error occurred while calling o2503.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 188.0 failed 4 times, most recent failure: Lost task 19.3 in stage 188.0 (TID 375286, rs119.hadoop.pvt): scala.MatchError: [25,1.0,(5,[1],[1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
    at org.apache.spark.ml.regression.LinearRegression$$anonfun$5.apply(LinearRegression.scala:200)
    at org.apache.spark.ml.regression.LinearRegression$$anonfun$5.apply(LinearRegression.scala:200)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Actually I get a lot of those errors. I found this thread MatchError while accessing vector column in Spark 2.0 that throws a similar error, but I'm not using any mllib stuff. Maybe something is wrong with the sparse vector?

Community
  • 1
  • 1
Patrick McCarthy
  • 2,478
  • 2
  • 24
  • 40

0 Answers0