1

I have a UDF as below which is a normal scalar Pyspark UDF :

@udf()
def redact(colVal: column, offset: int = 0):
    if not colVal or not offset:
        return 'X'*8
    else:
        charList=list(colVal)
        charList[:-offset]='X'*(len(colVal)-offset)
        return "".join(charList)

While I try to convert this to pandas_udf as I read there is drastic performance inprovements in using vectorized UDF's in place of scalar UDF's, Im getting lot of issues related to pandas which Im less experienced in.

Please help me in converting this UDF to a Vectorized Pandas UDF

ASHISH M.G
  • 522
  • 2
  • 7
  • 23

1 Answers1

0

The redact function can be wrapped inside a function that applies redact on each item of pd.Series.

Currying needs to be applied since a scalar offset value is to be passed.

from pyspark.sql import functions as F
import pandas as pd

def pandas_wrapper(values: pd.Series, offset: int) -> pd.Series:
    def redact(colVal: str, offset: int = 0):
        if not colVal or not offset:
            return 'X'*8
        else:
            charList=list(colVal)
            charList[:-offset]='X'*(len(colVal)-offset)
            return "".join(charList)
    return values.apply(lambda value: redact(value, offset))

def curried_wrapper(offset: int):
    return F.pandas_udf(lambda x: pandas_wrapper(x, offset), "string")

df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None,)], ("data_col", ))

df.withColumn("redacted", curried_wrapper(2)(F.col("data_col"))).show()

Output

+--------+--------+
|data_col|redacted|
+--------+--------+
|  abcdef|  XXXXef|
|    12yz|    XXyz|
|    null|XXXXXXXX|
+--------+--------+
Nithish
  • 3,062
  • 2
  • 8
  • 16
  • wouldn't this defeat the purpose of using vectorized pandas udf (better performance by avoiding application of function at single row level), because it essentially is looping over all values (with `values.apply(lambda value: redact(value, offset))`? – Kashyap Mar 18 '22 at 21:44
  • Yes this is true, I don't attempt to vectorize the UDF here but present a way to invoke pandas UDF (with vectoized or non-vectorized implemenation). That said, using `pandas_udf` even with non-vectorized logic could give a boost in performance by overcoming the serialization overheads of non pandas UDF. – Nithish Mar 20 '22 at 09:06