2

I want to use some string similarity functions that are not native to pyspark such as the jaro and jaro-winkler measures on dataframes. These are readily available in python modules such as jellyfish. I can write pyspark udf's fine for cases where there a no null values present, i.e. comparing cat to dog. when I apply these udf's to data where null values are present, it doesn't work. In problems such as the one I'm solving it is very common for one of the strings to be null

I need help getting my string similarity udf to work in general, to be more specific, to work in cases where one of the values are null

I wrote a udf that works when there are no null values in the input data:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
          .withColumn('test',
                      jaro_winkler_udf(df[column_left], df[column_right])))

    return df

Example input and output:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
+-----------+------------+------------------+

When I run this on data that has a null value then I get the usual reams of spark errors, the most applicable one seems to be TypeError: str argument expected. I assume this one is due to null values in the data since it worked when there were none.

I modified the function above to to check if both values are not null and only run the function if that's the case, otherwise return 0.

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
       .withColumn('test',
                   F.when(df[column_left].isNotNull() & df[column_right].isNotNull(),
                          jaro_winkler_udf(df[column_left], df[column_right]))
                   .otherwise(0.0)))

    return df

However, I still get the same errors as before.

Sample input and what I would like the output to be:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
|       spud|        null|
|       null|        null|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
|       spud|        null|0.0               |
|       null|        null|0.0               |
+-----------+------------+------------------+
Auren Ferguson
  • 479
  • 6
  • 17
  • Have you tried replacing nulls with empty strings? – Chris May 07 '19 at 15:19
  • @Chris I have thought of it but not tried. Even if it did work, I don't think it's a great solution because if I have to convert null's to empty string, run all comparisons then turn them back to nulls. Also, when this scales up to tens or hundreds of billions of records I'd rather not have it comparing tons of empty strings to each other only to get a zero value which is already known since one of values is null. – Auren Ferguson May 07 '19 at 15:24
  • You need to make your `udf` robust to bad inputs. See the explanation in [this answer](https://stackoverflow.com/a/49634840/5858851), but essentially you need something like: `udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2) if s1 is not None and s2 is not None else None, returnType=DoubleType())` – pault May 07 '19 at 15:26
  • Possible duplicate of [Using UDF ignores condition in when](https://stackoverflow.com/questions/49634651/using-udf-ignores-condition-in-when) – pault May 07 '19 at 15:27
  • @pault This does the trick. No idea how it will perform though. I may need to write a scala function then somehow get it ti place nice with pyspark. – Auren Ferguson May 07 '19 at 15:49
  • @AurenFerguson this won't be noticeably worse than your original `udf`. You'd get a performance boost by trying to implement `jellyfish.jaro_winkler` with the DataFrame API functions. As a side note, I'd recommend you accept the dupe target since it answers your question. – pault May 07 '19 at 15:52
  • @pault I'm not exactly sure what you mean by "You'd get a performance boost by trying to implement jellyfish.jaro_winkler with the DataFrame API functions". I'm not familiar with this. – Auren Ferguson May 07 '19 at 15:58
  • @AurenFerguson after looking at the function, I'm not sure if it's even possible to do so in an efficient way. If you're willing to use a different distance metric, the levenshtein distance is implemented in [`pyspark.sql.functions.levenshtein`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.levenshtein). Read more on [spark functions vs. udf performance](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance/38297050#38297050). – pault May 07 '19 at 16:19
  • @pault I'm aware that levenshtein exists and already use it. I just thought there was a better way of using other similarity functions such as jaro-winkler – Auren Ferguson May 07 '19 at 16:22

1 Answers1

2

We will modify a little bit your code and it should works fine :

@udf(DoubleType())
def jaro_winkler(s1, s2):
    if not all((s1, s2)):  # or, if None in (s1, s2):
        out = 0
    else:
        out = jellyfish.jaro_winkler(s1, s2)
    return out


def jaro_winkler_func(df, column_left, column_right):
    df = df.withColumn("test", jaro_winkler(df[column_left], df[column_right]))
    return df

Depending on the expected behavior, you need to change the test :

  • if not all((s1, s2)): will return 0 for both null and empty string ''.
  • if None in (s1, s2): will return 0 only for null
Steven
  • 14,048
  • 6
  • 38
  • 73