4

I am trying to write pyspark code for the below sql query:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int

The above query joins between two tables and uses a "between" clause with the "on" clause. I have written a UDF which does the same but seems like it is very slow. Is there any way I can write the above query in pyspark code which will give me better performace.

Below are the code that I am using

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))

this is very inefficient. moreover, If I have region_code , I have to call the by changing the return value of the function ip_mapping.

df_final = df.withColumn("region_code", ip_mapped("ip_int"))
braj
  • 2,545
  • 2
  • 29
  • 40

2 Answers2

0

So, for each IP in your DF, you perform search in another DF of IPs->GeoIP enrichment?

Easy solution -> consider using MaxMind DB - https://github.com/maxmind/GeoIP2-python https://www.maxmind.com/en/home

Anyway, you should perform the operation once per IP, and return all GeoIP data for a specific IP.

Your ip_mapping function should return a list of items (e.g.: (country_code,city_code, region_code))

Your UDF should use a schema of array , and the result of the UDF will be several columns of output (see https://stackoverflow.com/a/35323680/5088142 for more info)

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65
  • thanks for answering this. I can not use the first as we already have a verified paid data source. I have modified my UDF but not much improvement in performance. – braj Jan 16 '17 at 08:43
0

You can define join condition using between and utilize it in join. Below example should work for you.

join_condition = [nk_ip_address_check.ip_number.between(ip_additional_pulse.ip_start_int,ip_additional_pulse.ip_end_int)]

nk_ip_address_check.alias('a')\
    .join(ip_additional_pulse.alias('b'),cond)\
    .selectExpr("a.ip_address",
                "a.ip_number",
                "b.ip_start_int",
                "b.ip_end_int",
                "b.post_code_id",
                "b.city",
                "b.region_name",
                "b.two_letter_country")
Shantanu Sharma
  • 3,661
  • 1
  • 18
  • 39