70

Context: I have a DataFrame with 2 columns: word and vector. Where the column type of "vector" is VectorUDT.

An Example:

word    |  vector
assert  | [435,323,324,212...]

And I want to get this:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....

Question:

How can I split a column with vectors in several columns for each dimension using PySpark ?

Thanks in advance

zero323
  • 322,348
  • 103
  • 959
  • 935
sedioben
  • 935
  • 1
  • 10
  • 16
  • See [How to Access Element of a VectorUDT column in a Spark Dataframe](https://stackoverflow.com/questions/39555864/how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe) for a solution with better performance. (I have done timings on both. I would mark this one as a duplicate if I had the reputation.) – hwrd Nov 01 '19 at 20:54
  • @hwrd Could you share the benchmarking code you've used? TIA. – 10465355 Nov 04 '19 at 18:04
  • @user10465355 added as a 'solution' below because it's too big for comments. (The organization is a little quirky because I pulled it out of a Jupyter notebook and then replace the %%timeit cell magic.) – hwrd Nov 07 '19 at 20:42

4 Answers4

102

Spark >= 3.0.0

Since Spark 3.0.0 this can be done without using UDF.

from pyspark.ml.functions import vector_to_array

(df
    .withColumn("xs", vector_to_array("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

Spark < 3.0.0

One possible approach is to convert to and from RDD:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3])),
    ("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist())

df.rdd.map(extract).toDF(["word"])  # Vector values will be named _2, _3, ...

## +-------+---+---+---+
## |   word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+

An alternative solution would be to create an UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

(df
    .withColumn("xs", to_array(col("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

For Scala equivalent see Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)].

user10938362
  • 3,991
  • 2
  • 12
  • 29
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 2
    Performance wise, it is much smarter to use the `.map/.toDF` functions as they will almost always be faster than the UDF implementation. [Unless you're using a `vectorized udf` definition from spark 2.2+] – tmarthal Nov 20 '17 at 23:47
  • Thanks for that comment. Won't the RDD API not become deprecated at some point? So I'd think the latter is recommended, or am I wring ? –  Apr 16 '18 at 09:58
  • note that, as of spark 2.3, `vectorized UDF` requires the input and output coolumns both be java primitives and thus, it will not work for this application. – Zo the Relativist Apr 20 '18 at 13:52
  • Related JIRA ticket to improve this: https://issues.apache.org/jira/browse/SPARK-19217 – pault May 22 '18 at 15:05
  • @zero323 - For me, the UDF way seems to have worked. How do I save the result as a DF? When I do a .show at the end, it shows me the DF just the way I want it. Just can't seem to find a way to name/save it. – Anonymous Person Feb 28 '19 at 12:45
  • Performance-wise, using toList creates a Python list object and populates it with Python float objects, which then need to be converted back to java double. Using the rdd is much slower than using a udf that lets SparkSQL handle most of the work, see: [39555864](https://stackoverflow.com/questions/39555864/how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe). I have done extensive timing on this issue, and am quite certain the sql UDF is superior. I would recommend that this one be listed as a duplicate, and linked to the above. – hwrd Oct 31 '19 at 23:55
  • When I try the `map...toDf` code, I get the following: `ValueError: RDD is empty`... any help? – Chuck Apr 06 '20 at 10:54
  • I'm facing same issue with @Chuck but this happens when creating df `.toDF(["word", "vector"]_` – haneulkim Jan 25 '22 at 14:53
4

To split the rawPrediction or probability columns generated after training a PySpark ML model into Pandas columns, you can split like this:

your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))
Nic Scozzaro
  • 6,651
  • 3
  • 42
  • 46
1

It is much faster to use the i_th udf from how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe

The extract function given in the solution by zero323 above uses toList, which creates a Python list object, populates it with Python float objects, finds the desired element by traversing the list, which then needs to be converted back to java double; repeated for each row. Using the rdd is much slower than the to_array udf, which also calls toList, but both are much slower than a udf that lets SparkSQL handle most of the work.

Timing code comparing rdd extract and to_array udf proposed here to i_th udf from 3955864:

from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType

sc = SparkContext('local[4]', 'FlatTestTime')

spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)

from pyspark.ml.linalg import Vectors

# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
    ("require", Vectors.sparse(3, {1: 2}), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)

def test_extract():
    return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

def test_to_array():
    df_to_array = df.withColumn("xs", to_array(col("vector"))) \
        .select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
        .withColumn("xx", to_array(col("vorpal"))) \
        .select(["word"] + ["xs[{}]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
    return df_to_array

# pack up to_array into a tidy function
def flatten(df, vector, vlen):
    fieldNames = df.schema.fieldNames()
    if vector in fieldNames:
        names = []
        for fieldname in fieldNames:
            if fieldname == vector:
                names.extend([col(vector)[i] for i in range(vlen)])
            else:
                names.append(col(fieldname))
        return df.withColumn(vector, to_array(col(vector)))\
                 .select(names)
    else:
        return df

def test_flatten():
    dflat = flatten(df, "vector", 3)
    dflat2 = flatten(dflat, "vorpal", 2)
    return dflat2

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.append("more")
select.extend([ith("vorpal", lit(i)) for i in range(2)])

# %% timeit ...
def test_ith():
    return df.select(select)

if __name__ == '__main__':
    import timeit

    # make sure these work as intended
    test_ith().show(4)
    test_flatten().show(4)
    test_to_array().show(4)
    test_extract().show(4)

    print("i_th\t\t",
          timeit.timeit("test_ith()",
                       setup="from __main__ import test_ith",
                       number=7)
         )
    print("flatten\t\t",
          timeit.timeit("test_flatten()",
                       setup="from __main__ import test_flatten",
                       number=7)
         )
    print("to_array\t",
          timeit.timeit("test_to_array()",
                       setup="from __main__ import test_to_array",
                       number=7)
         )
    print("extract\t\t",
          timeit.timeit("test_extract()",
                       setup="from __main__ import test_extract",
                       number=7)
         )

Results:

i_th         0.05964796099999958
flatten      0.4842299350000001
to_array     0.42978780299999997
extract      2.9254476840000017
hwrd
  • 431
  • 4
  • 7
  • That's some great looking code, but what does it do? Can you add some explanation for the different methods used? – Chuck Apr 06 '20 at 10:52
  • @https://stackoverflow.com/users/2254228/chuck the code times the two functions for Spark < 3.0.0 from the accepted solution here and the `ith` function proposed in https://stackoverflow.com/questions/39555864/how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe to show that the latter solution is superior. – hwrd May 04 '20 at 19:48
  • 1
    You are comparing DAG constructions and not the actual transformations. – A.Eddine Jul 24 '20 at 14:01
0
def splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns

for col in new_features: # new_features should be the same length as vector column length
    schema = schema.add(col,DoubleType(),True)

return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)

The function turns the feature vector column into separate columns

Shuai Liu
  • 688
  • 6
  • 10