1

I am executing this udf through pyspark on EMR and using spark 3.0.1 with yarn manager. How can I make this UDF faster?

I am using this external parser zipcodes. matching takes more time.

@udf(returnType=StringType())
def clean_zip(zip): 
    try:
   
    if len(zipcodes.matching(zip_corrected))>0:
        return   zip_corrected  
    else:
        return ""   

except Exception as x:  
    print("Error Occured in zip udf, Error: " + str(x))
Xi12
  • 939
  • 2
  • 14
  • 27
  • 1
    to me, it seems this can be done in pure pyspark using `when().otherwise()` (SQL's case when equivalent) – samkart Sep 02 '22 at 07:07
  • What about zipcodes.matching? How can i write this in sql? – Xi12 Sep 02 '22 at 09:44
  • that is independent from the `try else` block and can be done in an UDF. the heavy lifting of zip corrected creation can be done in pyspark and sent to that UDF. also, i believe you want to use [`zipcodes.is_real()`](https://pypi.org/project/zipcodes/#:~:text=%3E%3E%3E%20print(-,zipcodes.is_real,-(%2706463%27) – samkart Sep 02 '22 at 10:51
  • Will tru it and let you know thx – Xi12 Sep 02 '22 at 12:16
  • yes I have done heavy lifting in spark and using udf only for validating still same issue, any other alternatives – Xi12 Sep 03 '22 at 00:38

1 Answers1

0

Used-defined functions tend to have lower performance than Spark functions (see Spark functions vs UDF performance?).

I suggest that you load the zip codes in a dataframe and do a lookup yourself by a SQL join. The zip dataframe has ~40K rows and one can afford that when working with "big data".

Here's a demo:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import zipcodes

df = spark.read.csv("MOCK_DATA.csv", header='true')
df.show(10)
# +------------+--------------+-------+
# |        city|         state|zipcode|
# +------------+--------------+-------+
# |    Amarillo|         Texas|  79188|
# |  Vero Beach|       Florida|  32969|
# |   Charlotte|North Carolina|  28210|
# |   Rochester|      New York|  14639|
# |   Arlington|      Virginia|  22234|
# | San Antonio|         Texas|  78260|
# |      Austin|         Texas|  78764|
# |Cedar Rapids|          Iowa|  52410|
# |      Dallas|         Texas|  75216|
# |     Raleigh|North Carolina|  27605|
# +------------+--------------+-------+

@udf(returnType=StringType())
def clean_zip(zipcode): 
  try:
    if len(zipcodes.matching(zipcode))>0:
      return zipcode
    else:
      return ""
  except Exception as x:
    print("Error Occured in zip udf, Error: " + str(x))


df.withColumn('zip_correct', clean_zip(df.zipcode)).show(10)
# +------------+--------------+-------+-----------+
# |        city|         state|zipcode|zip_correct|
# +------------+--------------+-------+-----------+
# |    Amarillo|         Texas|  79188|           |
# |  Vero Beach|       Florida|  32969|      32969|
# |   Charlotte|North Carolina|  28210|      28210|
# |   Rochester|      New York|  14639|      14639|
# |   Arlington|      Virginia|  22234|      22234|
# | San Antonio|         Texas|  78260|      78260|
# |      Austin|         Texas|  78764|      78764|
# |Cedar Rapids|          Iowa|  52410|      52410|
# |      Dallas|         Texas|  75216|      75216|
# |     Raleigh|North Carolina|  27605|      27605|
# +------------+--------------+-------+-----------+

Save zip codes table into dataframe zips

# data from mockaroo
# get zips.json.bz2 from the zipcodes repository
# sorting by zip_code should speed up lookup
zips = spark.read.json("zips.json.bz2").orderBy("zip_code", ascending=False)
zips.count()
# 42724

Left join

# left join
df.join(zips, df.zipcode==zips.zip_code, 'left') \
  .select(df.city, df.state, df.zipcode, zips.zip_code.alias("zip_correct")) \
  .show(10)
# +------------+--------------+-------+-----------+                               
# |        city|         state|zipcode|zip_correct|
# +------------+--------------+-------+-----------+
# |    Amarillo|         Texas|  79188|      null | 
# |  Vero Beach|       Florida|  32969|      32969|
# |   Charlotte|North Carolina|  28210|      28210|
# |   Rochester|      New York|  14639|      14639|
# |   Arlington|      Virginia|  22234|      22234|
# | San Antonio|         Texas|  78260|      78260|
# |      Austin|         Texas|  78764|      78764|
# |Cedar Rapids|          Iowa|  52410|      52410|
# |      Dallas|         Texas|  75216|      75216|
# |     Raleigh|North Carolina|  27605|      27605|
# +------------+--------------+-------+-----------+
user2314737
  • 27,088
  • 20
  • 102
  • 114