I am new to spark and python. Any help appreciated.
I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude
UDF:
import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d
sample UDF output:
distance((101,121),(-121,-212))
15447.812243421227
dataframe:
zip=spark.read.option("sep", ",").csv('wasb://hdiazurepoc@dsazurepoc.blob.core.windows.net/main/zip.txt')
zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))
Sample zip1 data:
zip1.first()
Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)
Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.
z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))
Traceback (most recent call last):
File "", line 1, in
File "", line 5, in distance
TypeError: a float is required
Please let me know the right way to pass the df fields to udf.