3

I was following this SO post Efficient string matching in Apache Spark to get some string matching using LSH algorithm. For some reason getting results thru python API, but not in Scala. I don't see really where what is missing in Scala code.

Here below are the both codes:

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH

query = spark.createDataFrame(["Bob Jones"], "string").toDF("text")

db = spark.createDataFrame(["Tim Jones"], "string").toDF("text")

model = Pipeline(stages=[
    RegexTokenizer(
        pattern="", inputCol="text", outputCol="tokens", minTokenLength=1
    ),
    NGram(n=3, inputCol="tokens", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="vectors"),
    MinHashLSH(inputCol="vectors", outputCol="lsh")
]).fit(db)

db_hashed = model.transform(db)
query_hashed = model.transform(query)

model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, 0.75).show()

And it returns:

> +--------------------+--------------------+-------+ |            datasetA|            datasetB|distCol|
> +--------------------+--------------------+-------+ |[Tim Jones, [t, i...|[Bob Jones, [b, o...|    0.6|
> +--------------------+--------------------+-------+

However Scala returns nothing, and here is the code:

import org.apache.spark.ml.feature.RegexTokenizer
val tokenizer = new RegexTokenizer().setPattern("").setInputCol("text").setMinTokenLength(1).setOutputCol("tokens")
import org.apache.spark.ml.feature.NGram
val ngram = new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams")
import org.apache.spark.ml.feature.HashingTF
val vectorizer = new HashingTF().setInputCol("ngrams").setOutputCol("vectors")
import org.apache.spark.ml.feature.{MinHashLSH, MinHashLSHModel}
val lsh = new MinHashLSH().setInputCol("vectors").setOutputCol("lsh")
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(tokenizer, ngram, vectorizer, lsh))
val query = Seq("Bob Jones").toDF("text")
val db = Seq("Tim Jones").toDF("text")
val model = pipeline.fit(db)
val dbHashed = model.transform(db)
val queryHashed = model.transform(query)
model.stages.last.asInstanceOf[MinHashLSHModel].approxSimilarityJoin(dbHashed, queryHashed, 0.75).show

I am using Spark 3.0, I know its a test, but can't really test it on different version. And I doubt there is a bug like that :)

Tomasz Krol
  • 596
  • 6
  • 23
  • I've found when I added setNumHashTables(10), then Scala code returns me results. But still dont understand why in python resturns results without that. – Tomasz Krol Nov 13 '19 at 05:35
  • Also found that Scala code works without setting Number of Hash tables in Spark 2.4.4, so apparently something was changed in 3.0 – Tomasz Krol Nov 13 '19 at 09:52

1 Answers1

0

This code will work in Spark 3.0.1 if you set numHashTables correctly.

val lsh = new MinHashLSH().setInputCol("vectors").setOutputCol("lsh").setNumHashTables(3)
Suraj Rao
  • 29,388
  • 11
  • 94
  • 103