Starting from this example, I used a Locality-Sensitive Hashing (LSH) on Pyspark in order to find duplicated documents.
Some notes about my DB: I have 4M text files. Each file on average has 20K chars. Currently, I’m considering only the first 500chars of each doc.
When I increase the number of chars from 500 to 1000 I get memory errors.
I’ve tried working on the parameters of the pipeline. I know i can avoid memory error increasing n in Ngram and decreasing NumHashTables in MinHashLSH. However, this increases false negatives too much.
Are there any other steps in the pipeline that could improve performances?
My aim is to increase the number of chars from 500 to 2000 without having memory error or very long computational time (ideally, time computation < 6h).
This is my code with fake data:
# Prameters
# NGram
n_gram = 2 #really, i use n_gram=8 because i have 500char per each document
# MinHashLSH
hash_tables = 10 #really, i use hash_tables=3 to avoid memory error and too long computational time
# jaccard treshold
treshold_test = 0.5
#Fake dataframe
df = spark.createDataFrame([
(0, "Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(1, "I wish Java could use case classes I wish Java could use case classes!!"),
(2, "Logistic, regression, models, are, neat, etc, etc, etc, etc, etc, etc, etc, etc"),
(3, "Hi I heard about Spork Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(4, "Hi I heard about Java Hi I heard about Java Hi I heard about Java Hi I heard about Java")
], ["id", "text"])
# cleaning puntuactions and double spaces
df = df.withColumn("text", regexp_replace('text', r'\p{Punct}', ''))
df = df.withColumn("text", regexp_replace('text', r' (?= |$)', ''))
#trim whitespaces and filtering out text too short
df = df.withColumn("text", trim(col("text")))\
.filter((col('text') != "") & (length(col('text')) > n_gram*3))
df.show(5,False)
# LSH pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH
db = df
query = df
model = Pipeline(stages=[
RegexTokenizer(
pattern="", inputCol="text", outputCol="tokens", minTokenLength=1
),
NGram(n=n_gram, inputCol="tokens", outputCol="ngrams"),
HashingTF(inputCol="ngrams", outputCol="vectors"),
MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=hash_tables)]).fit(db)
db_hashed = model.transform(db)
query_hashed = model.transform(query)
output = model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, treshold_test)
# similar pairs of documents:
output.filter(col('datasetA.id') != col('datasetB.id'))\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("datasetA.text").alias("textA"),
col("datasetB.text").alias("textB"),
col("distCol")).sort(col("distCol"))\
.withColumn('comb', sort_array(array(*('idA', 'idB')))).dropDuplicates(['comb']).show()