0

I have a DataFrame containing a column with strings. I want to find similar strings and mark them with some flag. I am using the function ratio from python-Levenshtein module and want to mark strings having a ratio more than 0.90 as "similar". The following is an example of the DataFrame I have:

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat"),
    (3, "Logistic,regression,model,are,neat")
], ["id", "sentence"])

The desired output is:

+---+-----------------------------------+------------+
|id |sentence                           |similar_flag|
+---+-----------------------------------+------------+
|0  |Hi I heard about Spark             |            |
|1  |I wish Java could use case classes |            |
|2  |Logistic regression models are neat|2_0         |
|3  |Logistic regression model is neat  |2_1         |
|4  |Logistics regression model are neat|2_2         |
+---+-----------------------------------+------------+

Where "2_1" means "2" is the "id" of the reference string (first unique string used for matching) and "1" represents the first string that matches with it. I want to avoid for-loops completely. For smaller data, I have used for-loop to achieve the desired result in simple python and want to have same results in PySpark as well, hence I do not want to use any module other than python-Levenshtein. I have come across this approach, but it requires me to give up python-Levenshtein module. Also my DataFrame is likely to be huge (and expected to grow everyday), so this approach might cause memory errors. Is there a better way to achieve the desired result?

Japun Japun
  • 77
  • 11

1 Answers1

3

I would answer in three steps. Firstly, you need to allow the df to look at all options, therefore you might need a Carthesian product of your data using crossJoin, such as:

from pyspark.sql import functions as f

df_new = (
    sentenceDataFrame.crossJoin(
                         sentenceDataFrame.select(
                             f.col('sentence').alias('second_sentence'),
                             f.col('id').alias('second_id')))
)

Secondly, have a look at pyspark.sql.functions.levehstein. Once your sentences are arranged one against the other, add a new column with Levehstein distance using

df_new_with_dist = df_new.withColumn('levehstein_distance',
    f.levenshtein(f.col("sentence"), f.col("second_sentence"))
)

df_new_with_dist.show()

+---+--------------------+--------------------+---------+-------------------+
| id|            sentence|     second_sentence|second_id|levehstein_distance|
+---+--------------------+--------------------+---------+-------------------+
|  0|Hi I heard about ...|Hi I heard about ...|        0|                  0|
|  0|Hi I heard about ...|I wish Java could...|        1|                 27|
|  0|Hi I heard about ...|Logistic,regressi...|        2|                 29|
|  0|Hi I heard about ...|Logistic,regressi...|        3|                 28|
|  1|I wish Java could...|Hi I heard about ...|        0|                 27|
|  1|I wish Java could...|I wish Java could...|        1|                  0|
|  1|I wish Java could...|Logistic,regressi...|        2|                 32|
|  1|I wish Java could...|Logistic,regressi...|        3|                 31|
|  2|Logistic,regressi...|Hi I heard about ...|        0|                 29|
|  2|Logistic,regressi...|I wish Java could...|        1|                 32|
|  2|Logistic,regressi...|Logistic,regressi...|        2|                  0|
|  2|Logistic,regressi...|Logistic,regressi...|        3|                  1|
|  3|Logistic,regressi...|Hi I heard about ...|        0|                 28|
|  3|Logistic,regressi...|I wish Java could...|        1|                 31|
|  3|Logistic,regressi...|Logistic,regressi...|        2|                  1|
|  3|Logistic,regressi...|Logistic,regressi...|        3|                  0|
+---+--------------------+--------------------+---------+-------------------+

Finally, filter out all rows where id == second_id. If you wish to stick to your notation of, for example, 2_1, I recommend you add groupBy(f.col("id")) and aggregate on levehstein_distance with f.min(). Then you can concatenate your IDs, for example with

min_dist_df = (
    df_new_with_dist.where(f.col('id') != f.col('second_id'))
                    .groupBy(f.col('id').alias('second_id'))
                    .agg(f.min(f.col('levehstein_distance')).alias('levehstein_distance'))
)


(
    df_new_with_dist.join(min_dist_df,
                          on=['second_id', 'levehstein_distance'],
                          how='inner')
                    .withColumn('similar_flag', f.concat(f.concat(f.col('id'), f.lit('_'), f.col('second_id'))))
                    .select('id', 'sentence', 'similar_flag')
).show()

+---+--------------------+------------+
| id|            sentence|similar_flag|
+---+--------------------+------------+
|  2|Logistic,regressi...|         2_3|
|  1|I wish Java could...|         1_0|
|  0|Hi I heard about ...|         0_1|
|  3|Logistic,regressi...|         3_2|
+---+--------------------+------------+

While this is not exactly what you asked for, you can filter and tweak on levehstein_distance values to get your desired answer.

Napoleon Borntoparty
  • 1,870
  • 1
  • 8
  • 28
  • Thank you! This is a close enough solution and crossJoin solves the main problem of avoiding for loops. I looked through Spark Python API Docs but couldn't find a function equivalent to Levenshtein Ratio. Levenshtein Distance is also a similar measure but might cause regressions (I have previous results processed using simple python which I need to match). I'll definitely try this solution. – Japun Japun Nov 22 '19 at 11:31