3

I am new to spark and python. Any help appreciated.

I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude

UDF:

import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d

sample UDF output:

distance((101,121),(-121,-212)) 

15447.812243421227

dataframe:

zip=spark.read.option("sep", ",").csv('wasb://hdiazurepoc@dsazurepoc.blob.core.windows.net/main/zip.txt')
zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))

Sample zip1 data:

zip1.first()        

Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)

Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.

z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))

Traceback (most recent call last):
File "", line 1, in
File "", line 5, in distance
TypeError: a float is required

Please let me know the right way to pass the df fields to udf.

vaira
  • 33
  • 1
  • 1
  • 3

1 Answers1

12

I'm not so sure what is the data schema you have. But the following example is the right way to use udf to get the answer to your example.

from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371 # km
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d

df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))

+----------+------------+------------------+
|    origin| destination|          distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+
Matteo Guarnerio
  • 720
  • 2
  • 9
  • 26
chilun
  • 292
  • 6
  • 19
  • @vaira Do you have any problem with the code above? If the answer is what you want , please select it to be answer, thanks. – chilun Dec 18 '17 at 09:05