3

I've read the Spark section on Locality Sensitive Hashing and still don't understand some of it:

https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

And there's Bucketed Random Projection example for two DataFrames. I have one simple, spatial Dataset of points, like:

enter image description here

(Of course later I will have millions of points) and DataFrame looks like:

  X        Y
id                  
1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156

My question is: How to put points which are close to each other to same groups, so my original DataFrame would have additional column with this hashes / groups?

Best, Marcin

cincin21
  • 550
  • 1
  • 11
  • 26

2 Answers2

2

The BucketedRandomProjectionLSH do exactly what you need. The result hash for each point could be a group value. The only problem is to select proper radius, that will set the size of each bucket. Use .setBucketLength(0.02) to set the radius. The other small problem is extract the hash from the vector to the column. I use this method: Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]

Example with your Data

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector

val dfA = spark.createDataFrame(Seq(
  (1, Vectors.dense(11.6133, 48.1075)),
  (2, Vectors.dense(11.6142, 48.1066)),
  (3, Vectors.dense(11.6108, 48.1061)),
  (4, Vectors.dense(11.6207, 48.1192)),
  (5, Vectors.dense(11.6221, 48.1223)),
  (6, Vectors.dense(11.5969, 48.1276)),
  (7, Vectors.dense(11.5995, 48.1258)),
  (8, Vectors.dense(11.6127, 48.1066)),
  (9, Vectors.dense(11.6430, 48.1275)),
  (10, Vectors.dense(11.6368, 48.1278)),
  (11, Vectors.dense(11.5930, 48.1156))
  )).toDF("id", "coord")

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(0.02)
  .setNumHashTables(1)
  .setInputCol("coord")
  .setOutputCol("hashes")
val model = brp.fit(dfA)

val res = model.transform(dfA)

val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

res.select ($"id", vecToSeq($"hashes"(0))(0) as "bucket").show

The output gives 2 groups for radius 0.02:

  +---+------+
  | id|bucket|
  +---+------+
  |  1|2473.0|
  |  2|2473.0|
  |  3|2473.0|
  |  4|2474.0|
  |  5|2474.0|
  |  6|2474.0|
  |  7|2474.0|
  |  8|2473.0|
  |  9|2474.0|
  | 10|2474.0|
  | 11|2473.0|
Artem Aliev
  • 1,362
  • 7
  • 12
  • Great, I will test it tomorrow, how are numbers in the bucket (2743, 2744) created (why not 1,2, ...) ? – cincin21 Feb 03 '20 at 18:04
  • See the algorithm documentation: https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing. The entire space is divided into groups (bands, since the hash is the same). Your points are in these two bands. It may be better to use two hashes to get parallelepipeds or more for polygons. – Artem Aliev Feb 04 '20 at 10:00
  • I'm trying to rewrite your code to PySpark, I have problem with the last two lines of code, could you help? – cincin21 Feb 04 '20 at 10:14
  • Both yours and @Oli solutions are great, thank you for help! – cincin21 Feb 04 '20 at 11:21
  • question on the radius -> .setBucketLength(0.02) is this is radians, km or something? I've read the specs but it's not clear to me – cincin21 Feb 07 '20 at 12:01
  • it is radius in coordinates you use. By the way you should not consider it as real radius. It is magic number that you can change to get more or less unique hashes. – Artem Aliev Feb 07 '20 at 12:18
  • I'm trying to figure out a metric for this, as I want to load different dataset and then be clear on this value – cincin21 Feb 07 '20 at 12:23
2

Here is a bit of scala code that performs a LSH. Basically, the lsh needs an assembled vector that you can construct with a VectorAssembler.

// contructing the dataframe
val data= """1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156"""
val df = data
    .split("\\s*\\n\\s*")
    .map( _.split("\\s+") match {
        case Array(a, b, c) => (a.toInt,b.toDouble,c.toDouble)
    })
    .toSeq
    .toDF("id", "X", "Y")

val assembler = new VectorAssembler()
    .setInputCols(Array("X", "Y"))
    .setOutputCol("v")
val df2 = assembler.transform(df)
val lsh = new BucketedRandomProjectionLSH()
    .setInputCol("v")
    .setBucketLength(1e-3) // change that according to your use case
    .setOutputCol("lsh")
val result = lsh.fit(df2).transform(df2).orderBy("lsh")

// the lsh is in an array of vectors. To extract the double, we can use
// getItem for the array and a UDF for the vector.
val extract = udf((vector : org.apache.spark.ml.linalg.Vector) => vector(0))
result.withColumn("lsh", extract(col("lsh").getItem(0))).show(false)
Oli
  • 9,766
  • 5
  • 25
  • 46
  • Great, I will test it tomorrow and give my feedback! – cincin21 Feb 03 '20 at 18:04
  • It works, even I was able to rewrite it to PySpark with no problem, but on question: when I write the data to csv the "lsh" column is written like "[DenseVector([21188.0]". How to get only the "21188" from it? – cincin21 Feb 04 '20 at 10:16
  • I edited my answer with a way to extract the value from the array of vectors. – Oli Feb 04 '20 at 10:32
  • Nice! Maybe you know how to rewrite this to PySpark? I'm trying with lambda function but get "'DenseVector' object is not callable" – cincin21 Feb 04 '20 at 10:39
  • Try `extract = F.udf(lambda v : v[0])` and `result.withColumn("lsh", extract(F.col('lsh').getItem(0)))`. I have not tested it but that should do it. – Oli Feb 04 '20 at 10:48
  • In PySpark sometimes easy things seems to not work, still this worked: `firstElement=F.udf(lambda v:float(v[0]), T.FloatType())` and next `x.withColumn("lsh", firstElement(F.col('lsh').getItem(0))).show()` – cincin21 Feb 04 '20 at 11:19
  • Nice! By the way, just a thought, if LSH does not give you satisfactory results in your situation, you may want to try clustering, it seems more fitting considering your example (but I might also be wrong ;) ). – Oli Feb 04 '20 at 12:55
  • So will have millions of points so my thought is to do LSH and then loop data by created bucket groups and do DBSCAN (as it is not native implemented in Spark) – cincin21 Feb 04 '20 at 13:07