2

I have spark dataframe that has a column named features that holds vectors of data. This column is the output of pyspark's StandardScaler object. I am creating a dataset here similar to the one I have.

# create sample data
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features')


>>> df_vector.show()
+-------------+
|     features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+

I want to find the Euclidean distance between each vector and a particular cluster center(an array of same length). Assume the cluster center is:

cluster_center_0 = np.array([0.6, 0.7, 0.8])

How do I achieve this? I tried creating a SQL query hoping that I could get access to the elements inside the vector using OFFSET and from there it would be easy to calculate the distances. But that didn't work out. This is the query I used. Unfortunately it doesn't work and I have very limited knowledge of sql

SELECT aml_cluster_inpt_features
aml_cluster_inpt_features[OFFSET(0)] AS offset_0,
aml_cluster_inpt_features[OFFSET(1)] AS offset_1,
aml_cluster_inpt_features[OFFSET(2)] AS offset_2,
aml_cluster_inpt_features[OFFSET(3)] AS offset_3,
FROM event_rate_holder

Is there a simpler way of doing this? If not, am I headed in the right direction with the sql query above?

desertnaut
  • 57,590
  • 26
  • 140
  • 166
Clock Slave
  • 7,627
  • 15
  • 68
  • 109

1 Answers1

2

Just use UDF:

from pyspark.sql.functions import udf
from scipy.spatial import distance

def euclidean(v1):
    @udf("double")
    def _(v2):
        return distance.euclidean(v1, v2) if v2 is not None else None
    return _


center = np.array([0.6, 0.7, 0.8])

df_vector.withColumn("dist", euclidean(center)("features")).show()
# +-------------+-----------------+
# |     features|             dist|
# +-------------+-----------------+
# |[1.0,2.0,3.0]|2.586503431275513|
# |[4.0,5.0,6.0]|7.555792479945437|
# +-------------+-----------------+

If you want to disassemble vectors you can use How to split Vector into columns - using PySpark

desertnaut
  • 57,590
  • 26
  • 140
  • 166
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • 1
    Thanks for the answer. This works fine. But is there a way we can avoid udf and still get the results? I have read (and experienced) that udf's are not that efficient and slow down the process. The volume of data we use is around 10 million records every dataset and there are multiple datasets so I am not sure if udf will be ideal here – Clock Slave May 15 '18 at 10:29
  • 2
    There isn't. Vectors have no native implementation so cannot be accessed directly. This might change in the future (although I wouldn't expect performance gains, only a better API). – Alper t. Turker May 15 '18 at 10:31