10

I am working with pyspark, and wondering if there is any smart way to get euclidean dstance between one row entry of array and the whole column. For instance, there is a dataset like this.

+--------------------+---+
|            features| id|
+--------------------+---+
|[0,1,2,3,4,5     ...|  0|
|[0,1,2,3,4,5     ...|  1|
|[1,2,3,6,7,8     ...|  2|

Choose one of the column i.e. id==1, and calculate the euclidean distance. In this case, the result should be [0,0,sqrt(1+1+1+9+9+9)]. Can anybody figure out how to do this efficiently? Thanks!

Yong Hyun Kwon
  • 359
  • 1
  • 3
  • 15
  • Do you need distances between only a single entry and an entire column or will you also need between entire two columns? For first case, it is pretty simple by creating a UDF with that single entry and feeding the column to it. In second case, it becomes a little complicated. – mayank agrawal Oct 14 '17 at 08:04
  • @mayankagrawal It is the first case I am looking for. I am not so familiar with UDF, so can you give me some hint on that? – Yong Hyun Kwon Oct 14 '17 at 14:40

4 Answers4

13

If you want euclidean for a fixed entry with a column, simply do this.

import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from scipy.spatial import distance

fixed_entry = [0,3,2,7...] #for example, the entry against which you want distances
distance_udf = F.udf(lambda x: float(distance.euclidean(x, fixed_entry)), FloatType())
df = df.withColumn('distances', distance_udf(F.col('features')))

Your df will have a column of distances.

mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
  • File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 71, in return lambda *a: f(*a) File "", line 6, in File "/home/yhkwon/anaconda3/lib/python3.6/site-packages/scipy/spatial/distance.py", line 433, in euclidean dist = norm(u - v) ValueError: operands could not be broadcast together with shapes (31,) (2,) I am trying your method, and this is what I am getting if i try to collect the output. I selected one row from the dataframe for fixed_entry. – Yong Hyun Kwon Oct 15 '17 at 09:51
  • Sorry, I used the Row object instead of the array inside it. Thanks! – Yong Hyun Kwon Oct 15 '17 at 10:06
  • is there any way not using scipy library? – Yong Hyun Kwon Oct 16 '17 at 00:43
  • @staticmethod I created my own function like this not to use scipy library def euclidean(x, y): distance = 0 for i in range(len(x)): distance += ((x[i] - y[i]) * (x[i] - y[i])) return float(math.sqrt(distance)) – Yong Hyun Kwon Oct 16 '17 at 02:02
  • yes you can avoid the use of scipy. If numpy is fine with you, use dist = numpy.linalg.norm(a-b). If you dont want to use any library your method is fine but I doubt it will be faster than libraries. Anyways, you can write the for loop in a single line like this math.sqrt(sum( (a - b)**2 for a, b in zip(a, b))). Check this out https://stackoverflow.com/a/4169284/7045987 – mayank agrawal Oct 16 '17 at 07:23
4

You can do BucketedRandomProjectionLSH [1] to get a cartesian of distances between your data frame.

from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(
    inputCol="features", outputCol="hashes", seed=12345, bucketLength=1.0
)
model = brp.fit(df)
model.approxSimilarityJoin(df, df, 3.0, distCol="EuclideanDistance")

You can also get distances for one row to column with approxNearestNeighbors [2], but the results are limited by numNearestNeighbors, so you could give it the count of the entire data frame.

one_row = df.where(df.id == 1).first().features
model.approxNearestNeighbors(df2, one_row, df.count()).collect()

Also, make sure to convert your data to Vectors!

from pyspark.sql import functions as F

to_dense_vector = F.udf(Vectors.dense, VectorUDF())
df = df.withColumn('features', to_dense_vector('features'))

[1] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSH

[2] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSHModel.approxNearestNeighbors

munro
  • 341
  • 1
  • 3
  • 6
  • For approxSimilarityJoin, 3.0 is the threshold parameter above which all distances will be rejected. So it will not give a complete array with all distances. – mayank agrawal Oct 14 '17 at 07:58
  • I am actually using this library, but there is problem when the dataset is too small. I am working on oversampling, and when the minority class is too small(ex 5), if I do the BucketedRandomProjection LSH, i think only one element is placed in the bucket and no any other neighbor. That's why i am looking for brute force. Any solution to this? – Yong Hyun Kwon Oct 14 '17 at 14:37
2

Here is an implementation using SQL Function power() to compute Euclidean distance between matching rows in two dataframes

cols2Join = ['Key1','Key2']
colsFeature =['Feature1','Feature2','Feature3','Feature4']
columns = cols2Join + colsFeature

valuesA = [('key1value1','key2value1',111,22,33,.334),('key1value3','key2value3', 333,444,12,.445),('key1value5','key2value5',555,666,101,.99),('key1value7','key2value7',777,888,10,.019)]
table1 = spark.createDataFrame(valuesA,columns)
valuesB = [('key1value1','key2value1',22,33,3,.1),('key1value3','key2value3', 88,99,4,1.23),('key1value5','key2value5',4,44,1,.998),('key1value7','key2value7',9,99,1,.3)]
table2= spark.createDataFrame(valuesB,columns)

#Create the sql expression using list comprehension, we use sql function power to compute euclidean distance inline
beginExpr='power(('
InnerExpr = ['power((a.{}-b.{}),2)'.format(x,x) for x in colsFeature]
InnerExpr = '+'.join(str(e) for e in InnerExpr)
endExpr ='),0.5) AS EuclideanDistance'
distanceExpr = beginExpr + InnerExpr + endExpr
Expr =  cols2Join+  [distanceExpr]

#now just join the tables and use Select Expr to get Euclidean distance
outDF = table1.alias('a').join(table2.alias('b'),cols2Join,how="inner").selectExpr(Expr)

display(outDF)
Narendra Rana
  • 391
  • 3
  • 9
0

If you need to find euclidean distances between only one particular row and every other row in dataframe, then you can filter & collect that row and pass it to udf.

But, if you need to calculate distance between all pairs You need to use join.
Repartition the dataframe by id, it will speed up the join operation. No need to calculate full pairwise matrix, just calculate the upper or lower half and replicate it. I wrote a function for myself based on the this logic.

 df = df.repartition("id")
 df.cache()
 df.show()


 #metric = any callable function to calculate distance b/w two vectors
 def pairwise_metric(Y, metric, col_name="metric"):

     Y2 = Y.select(f.col("id").alias("id2"), 
                 f.col("features").alias("features2"))

     # join to create lower or upper half
     Y = Y.join(Y2, Y.id < Y2.id2, "inner")

     def sort_list(x):

         x = sorted(x, key=lambda y:y[0])
         x = list(map(lambda y:y[1], x))

         return(x)

     udf_diff = f.udf(lambda x,y: metric(x,y), t.FloatType())
     udf_sort = f.udf(sort_list, t.ArrayType(t.FloatType()))

     Yid = Y2.select("id2").distinct().select("id2", 
          f.col("id2").alias("id")).withColumn("dist", f.lit(0.0))

     Y = Y.withColumn("dist", udf_diff("features", 
              "features2")).drop("features","features2")

     # just swap the column names and take union to get the other half
     Y =Y.union(Y.select(f.col("id2").alias("id"),
          f.col("id").alias("id2"), "dist"))
     # union for the diagonal elements of distance matrix
     Y = Y.union(Yid)

     st1 = f.struct(["id2", "dist"]).alias("vals")
     # groupby , aggregate and sort
     Y = (Y.select("id",st1).groupBy("id").agg(f.collect_list("vals").
                             alias("vals")).withColumn("dist",udf_sort("vals")).drop("vals"))

     return(Y.select(f.col("id").alias("id1"), f.col("dist").alias(col_name)))
pauli
  • 4,191
  • 2
  • 25
  • 41