I have written the following pandas_udf to calculate the haversine distance in PySpark:
def haversine(witness_lat : pd.Series, witness_lon: pd.Series, beacon_lat: pd.Series, beacon_lon: pd.Series) -> pd.Series:
if None in [witness_lat, witness_lon, beacon_lat, beacon_lon]:
return None
else:
lon1 = witness_lon
lat1 = witness_lat
lon2 = beacon_lon
lat2 = beacon_lat
lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
m = 6367000 * c
return m
@pandas_udf("float", PandasUDFType.SCALAR)
def udf_calc_distance(st_y_witness, st_x_witness, st_y_transmitter, st_x_transmitter):
distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
return distance_df['distance']
This code runs properly and gives me the answer I would expect however I get a depreciation warning shown below.
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
warnings.warn(
I've looked at the latest pandas_udf documentation on databricks here: https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html but I'm not sure how to use the hints with the apply formatting. I set up my code based on other examples I've seen on stack overflow like this one: Passing multiple columns in Pandas UDF PySpark which follow the format that will be depreciated.
Thank you for the help!