27

I have a dataframe df with a VectorUDT column named features. How do I get an element of the column, say first element?

I've tried doing the following

from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()

but I get a net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype) error. Same error if I do first_elem_udf = first_elem_udf(lambda row: row.toArray()[0]) instead.

I also tried explode() but I get an error because it requires an array or map type.

This should be a common operation, I think.

zero323
  • 322,348
  • 103
  • 959
  • 935
Christian Alis
  • 6,556
  • 5
  • 31
  • 29

5 Answers5

28

Convert output to float:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

Example usage:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    (1, Vectors.dense([1, 2, 3])),
    (2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])

df.select(ith("features", lit(1))).show()

## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## |              2.0|
## |              9.0|
## +-----------------+

Explanation:

Output values have to be reserialized to equivalent Java objects. If you want to access values (beware of SparseVectors) you should use item method:

v.values.item(0)

which return standard Python scalars. Similarly if you want to access all values as a dense structure:

v.toArray().tolist()
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I get `Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)`. Any clues? – haneulkim Jan 27 '22 at 19:52
3

If you prefer using spark.sql, you can use the follow custom function 'to_array' to convert the vector to array. Then you can manipulate it as an array.

 from pyspark.sql.types import ArrayType, DoubleType
 def to_array_(v):
        return v.toArray().tolist()
 from pyspark.sql import SQLContext
 sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None) 
 sqlContext.udf.register("to_array",to_array_,  ArrayType(DoubleType()))

example

    from pyspark.ml.linalg import Vectors
    
    df = sc.parallelize([
        (1, Vectors.dense([1, 2, 3])),
        (2, Vectors.sparse(3, [1], [9]))
    ]).toDF(["id", "features"])
    
    df.createOrReplaceTempView("tb")
    
    spark.sql("""select * , to_array(features)[1] Second from  tb   """).toPandas()

output

    id  features    Second
0   1   [1.0, 2.0, 3.0] 2.0
1   2   (0.0, 9.0, 0.0) 9.0
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Ben2018
  • 535
  • 3
  • 11
2

I ran into the same problem with not being able to use explode(). One thing you can do is use VectorSlice from the pyspark.ml.feature library. Like so:

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

slicer = VectorSlicer(inputCol="features", outputCol="features_one", indices=[0])

output = slicer.transform(df)

output.select("features", "features_one").show()
DataBach
  • 1,330
  • 2
  • 16
  • 31
  • I like this solution best, but it still results in the "features_one" column being a 1-element list. – Dr. Andrew Dec 22 '20 at 17:52
  • I have the same problem. Any quick way to extract the 1 element out? Also can we write pipeline to "explode" multiple elements from the veactor? – Shi Chen Jan 26 '22 at 18:45
1

For anyone trying to split the probability columns generated after training a PySpark ML model into usable columns. This does not use UDF or numpy. And this will only work for binary classification. Here lr_pred is the dataframe which has the predictions from the Logistic Regression Model.

prob_df1=lr_pred.withColumn("probability",lr_pred["probability"].cast("String"))

prob_df =prob_df1.withColumn('probabilityre',split(regexp_replace("probability", "^\[|\]", ""), ",")[1].cast(DoubleType()))
n1tk
  • 2,406
  • 2
  • 21
  • 35
Nidhi
  • 561
  • 4
  • 7
0

Since Spark 3.0.0 this can be done without using UDF.

from pyspark.ml.functions import vector_to_array

https://discuss.dizzycoding.com/how-to-split-vector-into-columns-using-pyspark/

Why is Vector[Double] is used in the results? That's not a very nice data type.

ChrisGPT was on strike
  • 127,765
  • 105
  • 273
  • 257
Zach Hou
  • 11
  • 1