1

I have an email column in a dataframe and I want to replace part of it with asterisks. I am unable to figure it out using PySpark functions.

My email column could be something like this"

email_col
abc123@gmail.com
123abc123@yahoo.com

What I want to achieve is this:

mod_email_col
ab**23@gmail.com
12*****23@yahoo.com

So essentially apart from the 1st 2 characters and the last 2 characters, I want the remaining part to be replaced by asterisks.

This is what I tried

from pyspark.sql import functions as F

split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))

df.withColumn(
    'masked_part', 
     F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)
pault
  • 41,343
  • 15
  • 107
  • 149
Manas Jani
  • 699
  • 2
  • 11
  • 33
  • You should split the string at `@` and then have a look at my answer: [substring multiple characters from the last index of a pyspark string column using negative indexing](https://stackoverflow.com/questions/49793479/substring-multiple-characters-from-the-last-index-of-a-pyspark-string-column-usi/49793592#49793592) – pissall Oct 29 '19 at 18:56
  • I tried all of that. But I am unable to figure out how extract going from the 1st 2 and then up to the last 2. I did the split to get the part before the domain of course. Updated with code now – Manas Jani Oct 29 '19 at 18:57

2 Answers2

2

I think you can achieve this with the help of following regular expression: (?<=.{2})\w+(?=.{2}@)

  • (?<=.{2}): Positive lookbehind for 2 characters
  • \w+: Any word characters
  • (?=.{2}@): Positive lookahead for 2 characters followed by a literal @

First use regexp_extract to extract this pattern from your string.

from pyspark.sql.functions import regexp_extract, regexp_replace

df = df.withColumn(
    "pattern", 
    regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
)
df.show()
#+-------------------+-------+
#|              email|pattern|
#+-------------------+-------+
#|   abc123@gmail.com|     c1|
#|123abc123@yahoo.com|  3abc1|
#|      abcd@test.com|       |
#+-------------------+-------+

Then use regexp_replace to create a replacement of * of the same length.

df = df.withColumn(
    "replacement",
    regexp_replace("pattern", r"\w", "*")
)
df.show()
#+-------------------+-------+-----------+
#|              email|pattern|replacement|
#+-------------------+-------+-----------+
#|   abc123@gmail.com|     c1|         **|
#|123abc123@yahoo.com|  3abc1|      *****|
#|      abcd@test.com|       |           |
#+-------------------+-------+-----------+

Next use regexp_replace again on the original email column using the derived pattern and replacement columns.

To be safe, concat the lookbehind/lookaheads from the original pattern when doing the replacment. To do this, we will have to use expr in order to pass the column values as parameters.

from pyspark.sql.functions import concat, expr, lit

df = df.withColumn(
    "mod_email_col",
    expr("regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)")
)
df.show()
#+-------------------+-------+-----------+-------------------+
#|              email|pattern|replacement|      mod_email_col|
#+-------------------+-------+-----------+-------------------+
#|   abc123@gmail.com|     c1|         **|   ab**23@gmail.com|
#|123abc123@yahoo.com|  3abc1|      *****|12*****23@yahoo.com|
#|      abcd@test.com|       |           |      abcd@test.com|
#+-------------------+-------+-----------+-------------------+

Finally drop the intermediate columns:

df = df.drop("pattern", "replacement")
df.show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abcd@test.com|      abcd@test.com|
#+-------------------+-------------------+

Note: I added one test case to show that this does nothing if the email address part is 4 characters or less.


Update: Here are some ways you can handle edge cases where the email address part is less than 4 characters.

The rules I am using:

  • Email address length is more than 5: do the above
  • Email address length is 3, 4, or 5: keep the first and last characters, masking the others with *
  • Email address is length 1 or 2: mask single the character before the @

Code:

patA = "regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
patB = "regexp_replace(email, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"

from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when

df.withColumn("address_part", split("email", "@").getItem(0))\
.withColumn(
    "pattern", 
    when(
        length("address_part") > 5, 
        regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
    ).otherwise(
        regexp_extract("email", r"(?<=.{1})\w+(?=.{1}@)", 0)
    )
).withColumn(
    "replacement", regexp_replace("pattern", r"\w", "*")
).withColumn(
    "mod_email_col",
    when(
        length("address_part") > 5, expr(patA)
    ).when(
        length("address_part") > 3, expr(patB)
    ).otherwise(regexp_replace('email', '\w(?=@)', '*'))
).drop("pattern", "replacement", "address_part").show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|     abcde@test.com|     a***e@test.com|
#|      abcd@test.com|      a**d@test.com|
#|        ab@test.com|        a*@test.com|
#|         a@test.com|         *@test.com|
#+-------------------+-------------------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • wow, this is beautiful. Thank you so much! What if I want to include the one with just 4 characters, so like take the 1st and the last character and replace the between? – Manas Jani Oct 29 '19 at 19:24
  • @ManasJani in that case use [`when`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when) to check the length of the replacement string. If its > 0 do this, otherwise use a different pattern (without testing I think it's as simple as changing the `{2}`s to `{1}`s. You'll also have to handle the case where the email is only 2 characters long.. – pault Oct 29 '19 at 19:26
  • So if I want to keep it dynamic then I would have to use the when condition sort of thing? – Manas Jani Oct 29 '19 at 19:27
  • 1
    @ManasJani I posted an update to handle your edge cases – pault Oct 29 '19 at 20:05
  • Thank you so much for this, really helped me understand regex better. How do I pass a function argument to the expressions above? like `def mask_email(df, email_col):` ? I am getting an error when I pass a value to the `email_col` arg. – Manas Jani Oct 29 '19 at 22:16
0

Your problem can be simplified using some string manipulations(Spark SQL functions: instr, concat, left, repeat, substr):

First find the position of the @ in the email string: pos_at = instr(email_col, '@'), then the length of the username part is pos_at - 1. If we take N=2 as the number of chars to be kept, then the number of chars to be masked should be pos_at - 1 - 2*N, in the code, we have:

from pyspark.sql.functions import instr, expr

df = spark.createDataFrame(
        [(e,) for e in ['abc123@gmail.com', '123abc123@yahoo.com', 'abd@gmail.com']]
      , ['email_col']
)

# set N=2 as a parameter in the SQL expression
N = 2

df.withColumn('pos_at', instr('email_col', '@')) \
  .withColumn('new_col', expr("""
        CONCAT(LEFT(email_col,{0}), REPEAT('*', pos_at-1-2*{0}), SUBSTR(email_col, pos_at-{0}))
   """.format(N))).show(truncate=False)
#+-------------------+------+-------------------+
#|email_col          |pos_at|new_col            |
#+-------------------+------+-------------------+
#|abc123@gmail.com   |7     |ab**23@gmail.com   |
#|123abc123@yahoo.com|10    |12*****23@yahoo.com|
#|abd@gmail.com      |4     |abbd@gmail.com     |
#+-------------------+------+-------------------+

Notice the issue with the last row when pos_at - 1 <= 2*N, which has to be processed separately. If I define the following logic:

if `pos_at - 1 <= 2*N`:   keep the first char and mask the rest
otherwise: keep the original processing routine

the whole processing can be wrapped up in a lambda function with two arguments (column_name and N)

# in the SQL expression, {0} is column_name and {1} is N
mask_email = lambda col_name, N: expr("""

  IF(INSTR({0}, '@') <= {1}*2+1
    , CONCAT(LEFT({0},1), REPEAT('*', INSTR({0}, '@')-2), SUBSTR({0}, INSTR({0}, '@')))
    , CONCAT(LEFT({0},{1}), REPEAT('*', INSTR({0}, '@')-1-2*{1}), SUBSTR({0}, INSTR({0}, '@')-{1}))
  ) as `{0}_masked`

""".format(col_name, N))

df.select('*', mask_email('email_col', 2)).show()
#+-------------------+-------------------+
#|          email_col|   email_col_masked|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abd@gmail.com|      a**@gmail.com|
#+-------------------+-------------------+
jxc
  • 13,553
  • 4
  • 16
  • 34