35

I trained a LogisticRegression model in PySpark (ML package) and the result of the prediction is a PySpark DataFrame (cv_predictions) (see [1]). The probability column (see [2]) is a vector type (see [3]).

[1]
type(cv_predictions_prod)
pyspark.sql.dataframe.DataFrame

[2]
cv_predictions_prod.select('probability').show(10, False)
+----------------------------------------+
|probability                             |
+----------------------------------------+
|[0.31559134817066054,0.6844086518293395]|
|[0.8937864350711228,0.10621356492887715]|
|[0.8615878905395029,0.1384121094604972] |
|[0.9594427633777901,0.04055723662220989]|
|[0.5391547673698157,0.46084523263018434]|
|[0.2820729747752462,0.7179270252247538] |
|[0.7730465873083118,0.22695341269168817]|
|[0.6346585276598942,0.3653414723401058] |
|[0.6346585276598942,0.3653414723401058] |
|[0.637279255218404,0.362720744781596]   |
+----------------------------------------+
only showing top 10 rows

[3]
cv_predictions_prod.printSchema()
root
 ...
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)

How do I create parse the vector of the PySpark DataFrame, such that I create a new column that just pulls the first element of each probability vector?

This question is similar to, but the solutions in the links below didn't work/weren't clear to me:

How to access the values of denseVector in PySpark

How to access element of a VectorUDT column in a Spark DataFrame?

user2205916
  • 3,196
  • 11
  • 54
  • 82

1 Answers1

47

Update:

It seems like there is a bug in spark that prevents you from accessing individual elements in a dense vector during a select statement. Normally you should would be able to access them just like you would a numpy array, but when trying to run the code previously posted, you may get the error pyspark.sql.utils.AnalysisException: "Can't extract value from probability#12;"

So, one way to handle this to avoid this silly bug is to use a udf. Similar to the other question, you can define a udf in the following way:

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

firstelement=udf(lambda v:float(v[0]),FloatType())
cv_predictions_prod.select(firstelement('probability')).show()

Behind the scenes this still accesses the elements of the DenseVector like a numpy array, but it doesn't throw the same bug as before.


Since this is getting a lot of upvotes, I figured I should strike through the incorrect portion of this answer.

Original answer: A dense vector is just a wrapper for a numpy array. So you can access the elements in the same way that you would access the elements of a numpy array.

There are several ways to access individual elements of an array in a dataframe. One is to explicitly call the column cv_predictions_prod['probability'] in your select statement. By explicitly calling the column, you can perform operations on that column, like selecting the first element in the array. For example:

cv_predictions_prod.select(cv_predictions_prod['probability'][0]).show()

should solve the problem.

DavidWayne
  • 2,450
  • 15
  • 26
  • 2
    No, it won't work. `VectorUDT` is not represented as an `ArrayType`. – zero323 Jun 13 '17 at 11:36
  • From the pyspark documentation: A dense vector represented by a value array. We use numpy array for storage and arithmetics will be delegated to the underlying numpy array. http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.linalg.DenseVector . What is the error that you get when trying to run the sample code? – DavidWayne Jun 13 '17 at 13:16
  • This is not a bug. Spark `DataFrame` is not even close to Python object, it doesn't use NumPy behind the scenes, unless you explicitly convert it to Python RDD (batch Python eval used by udf) and `VectorUDT` is not a native SQL type, hence it doesn't provide the same features as for example `ArrayType`. – zero323 Jun 13 '17 at 20:55
  • updated answer works but does old answer work? – dksahuji Jul 12 '17 at 06:41
  • Gee, thanks, I cannot believe that such a simple task is *so* complicated. – Czechnology May 23 '19 at 20:53
  • 1
    also worked with withColumn – Shuai Liu Oct 25 '19 at 09:37
  • Worked for me, but my colleague told me that UDF reduces PySpark performance, is it true ?? Any way to replace it ?? – Noppu Nov 27 '19 at 12:35