1

Have dataframe with text column CALL_TRANSCRIPT (string format) and pii_allmethods column (array of string). Trying to search Call_Transcripts for strings in array & mask using pyspark pandas udf. Getting outputted more than input rows errors. Tried couple of ways to troubleshoot , but not able to resolve.

Inner for loop is to go through pii_list array and replace call_transcript (text variable) with mask value. yield is after inner loop is done , so not clear why it would return more rows than input

NOTE: I have Spark UDF which is working , for performance improvements trying pandas udf

dfs = dfs.withColumn('FULL_TRANSCRIPT', pu_mask_all_pii(col("CALL_TRANSCRIPT"), 
col("pii_allmethods")))

**Python UDF function :** 

@pandas_udf("string")
def pu_mask_all_pii(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> 
Iterator[pd.Series]:

for text, pii_list in iterator:
    pii_list = sorted(pii_list,key=len, reverse=True)
    
    strtext = str(text)
    for pii in pii_list:
     if len(pii) > 1:
      mask = len(pii) * 'X'   
      strtext = str(re.sub(re.escape(pii), mask,strtext.encode(),flags=re.IGNORECASE))

    
    yield strtext   


     **PythonException:** An exception was thrown from a UDF: 'AssertionError: Pandas 
     SCALAR_ITER UDF outputted more rows than input rows.'. Full traceback below:
Mohan Rayapuvari
  • 289
  • 1
  • 4
  • 18

1 Answers1

2

Setup

df.show()

+--------------------+-------------------+
|     CALL_TRANSCRIPT|     pii_allmethods|
+--------------------+-------------------+
|foo bar <name> ba...|  [<name>, <phone>]|
|    xyz defgh <name>|[<name>, <address>]|
|                 pqr|          [<phone>]|
+--------------------+-------------------+

Solution

There's no need to use complex pandas UDF function here; instead, you can simply use spark UDF to operate on one row at a time and perform the replacement

import re

@F.udf
def mask(text, tokens):
    pat = '|'.join(map(re.escape, tokens))
    return re.sub(pat, lambda g: 'X' * len(g.group()), text, flags=re.IGNORECASE)

df = df.withColumn('FULL_TRANSCRIPT', mask('CALL_TRANSCRIPT', 'pii_allmethods'))

Alternative solution (using Pandas UDF batching)

@F.pandas_udf('string')
def mask(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    # a: is a pandas series of CALL_TRANSCRIPT
    # b: is a pandas series of pii_allmethods
    for a, b in iterator: 
        result = []

        # zip the series to perform the replacement on one row at a time
        for text, tokens in zip(a, b): 
            pat = '|'.join(map(re.escape, tokens))
            text = re.sub(pat, lambda g: 'X' * len(g.group()), text, flags=re.IGNORECASE)
            result.append(text)

        # yield the result back to caller for the rows in this batch
        yield pd.Series(result)

df = df.withColumn('FULL_TRANSCRIPT', mask('CALL_TRANSCRIPT', 'pii_allmethods'))

Result

df.show()

+--------------------+-------------------+--------------------+
|     CALL_TRANSCRIPT|     pii_allmethods|     FULL_TRANSCRIPT|
+--------------------+-------------------+--------------------+
|foo bar <name> ba...|  [<name>, <phone>]|foo bar XXXXXX ba...|
|    xyz defgh <name>|[<name>, <address>]|    xyz defgh XXXXXX|
|                 pqr|          [<phone>]|                 pqr|
+--------------------+-------------------+--------------------+
Shubham Sharma
  • 68,127
  • 6
  • 24
  • 53
  • 1
    Trying to avoid row by row so exploring pandas udf, converting from spark udf. Thank you for above. – Mohan Rayapuvari Mar 26 '23 at 13:37
  • Honestly, I don;t see a point in using pandas UDF here if you are concerned about performance having said that I can provide the solution if you still need. – Shubham Sharma Mar 26 '23 at 15:54
  • 1
    that would be great. I am exploring pandas udf since spark udf takes longer and also would like to use pandas udf for other module 'flairnlp' so trying here first to explore how pandas udf iterator->iterator works to process udf in batches https://docs.databricks.com/udf/pandas.html#iterator-of-series-to-iterator-of-series-udf – Mohan Rayapuvari Mar 26 '23 at 16:02
  • No problem but in this particular case I think spark UDF is a much better choice – Shubham Sharma Mar 26 '23 at 16:06
  • It works perfect. I was getting series from first loop and later not looping them further for processing , that was the issue. Thank you! – Mohan Rayapuvari Mar 26 '23 at 21:07