1

I'm trying to get the country name with latitude and longitude as input, so I used the Nominatim API and when I pass as a UDF it works, but when I try to use pandas_udf get the following error:

An exception was thrown from a UDF: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 1, got 2'

This is my code

import requests
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType, udf

@pandas_udf("string", PandasUDFType.SCALAR)
def country_name(lat, lon):
    url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={lat}&lon={lon}"
    response = requests.get(url)
    data = response.json()
    if 'error' in data:
        return 'NA'
    else:
        return data['address']['country']

df = spark.createDataFrame([(40.730610, -73.935242)], ["lat", "lon"])
df = df.withColumn("country", country_name(df["lat"], df["lon"]))
df.show()

As I say if I use a regular UDF it works, the problem is when I try to use pandas_udf.

Azhar Khan
  • 3,829
  • 11
  • 26
  • 32
BryC
  • 89
  • 6

1 Answers1

1

Refer the Series to Scalar section of pandas_udf API guide.

Change your code as follows as per the sample example given in above guide. The changes are marked with comment # Changed.

import requests
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType, udf

@pandas_udf("string") # Changed
def country_name(lat: pd.Series, lon: pd.Series) -> str : # Changed
    url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={lat[0]}&lon={lon[0]}" # Changed
    response = requests.get(url)
    data = response.json()
    if 'error' in data:
        return 'NA'
    else:
        return data['address']['country']

df = spark.createDataFrame([(40.730610, -73.935242)], ["lat", "lon"])
# df = df.withColumn("country", country_name(df["lat"], df["lon"])) # Changed
df = df.select(country_name(df["lat"], df["lon"])) # Changed
df.show()

However, this strategy works if the function is expected to return a scalar (single) value for the given input series.

In real data, you would be expecting vetorization i.e for given lat-lon dataframe, you need a series of results. For this, the API should support list of lat-lon pairs. If not, then as you can see in the following code, you need to call the API for each lat, lon value, hence defeating the purpose of the vectorization achieved through pandas_udf.

import requests
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType, udf

def call_url(row):
  url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={row['lat']}&lon={row['lon']}"
  response = requests.get(url)
  data = response.json()
  if 'error' in data:
    return 'NA'
  else:
    return data['address']['country']

@pandas_udf("string")
def country_name(lat: pd.Series, lon: pd.Series) -> pd.Series :
  lat_lon_df = pd.DataFrame({"lat": lat, "lon": lon})
  lat_lon_df["country"] = lat_lon_df.apply(call_url, axis=1)
  return lat_lon_df["country"]

df = spark.createDataFrame([(40.730610, -73.935242), (45.0, -75.0)], ["lat", "lon"])
df = df.withColumn("country", country_name(df["lat"], df["lon"]))
df.show()

Output:

+--------+----------+-------------+
|     lat|       lon|      country|
+--------+----------+-------------+
|40.73061|-73.935242|United States|
|    45.0|     -75.0|       Canada|
+--------+----------+-------------+
Azhar Khan
  • 3,829
  • 11
  • 26
  • 32
  • It worked, Do you have any advice to a large df? My real data has like 4 million rows, but I wanted to try first with something easy, I suppose with pandas_udf is enough right? acordirng to the documentation pandas udf are faster than regular udf – BryC Jan 15 '23 at 20:21
  • `panadas_udf` is vectorized; but the underlying processing/logic applied should support that too. In this case, can you make API call to send bulk (lat, lon) and recieve multiple results at once? Individual API calls nullify the vectorization efforts. – Azhar Khan Jan 16 '23 at 00:56
  • Seems that the API doesn't allow it, but I tried with caching some rows but maybe is not a good approach, also tried with spark.pandas and used `apply` in the entire df and after 8-9 minutes finished, then I passed to a spark df, but when I tried to display it was endless – BryC Jan 16 '23 at 14:44
  • Can you do lazy processing of lat, lon i.e. delay it until not required? You have about 1 million records. If you call the API that many times, either the API will under perform or if the API implements rate limiter pattern, then it will block you after certain number of requests per second or won't allow gross requests more than max threshold. – Azhar Khan Jan 17 '23 at 00:42