0

If I'm using Python Transforms in Palantir Foundry and I'm trying to run an algorithm which uses in-memory/non-spark libraries, and I want it automatically scale and work in Spark (not pandas). If I'm having a hard time writing the code and want to test and develop it locally, yet use the same code in pyspark later, how do I do this?

For a concrete example, I want to calculate the area of a geojson column which contains a polygon. Since I would need to use some libraries which arn't native to Spark (shapely and pyproj). I know that the best way (performance wise) is to use a pandas_udf (otherwise known as streaming udfs or vectorized udfs). But after reading a couple of guides, specifically Introducing Pandas UDF for PySpark, pandas user-defined functions and Modeling at Scale with Pandas UDFs w/code examples, it's still challenging to debug and get working, and it seems like I can't use break statements and there isn't a first class way to log/print.

The actual dataframe would have millions of rows (relating to millions of polygons), but for simplicity I wanted to test locally with a simple dataframe, and it scale to larger dataset later:

df = spark.createDataFrame(
    [
        ("AFG", "{\"type\":\"Polygon\",\"coordinates\":[[[61.210817,35.650072],[62.230651,35.270664],[62.984662,35.404041],[63.193538,35.857166],[63.982896,36.007957],[64.546479,36.312073],[64.746105,37.111818],[65.588948,37.305217],[65.745631,37.661164],[66.217385,37.39379],[66.518607,37.362784],[67.075782,37.356144],[67.83,37.144994],[68.135562,37.023115],[68.859446,37.344336],[69.196273,37.151144],[69.518785,37.608997],[70.116578,37.588223],[70.270574,37.735165],[70.376304,38.138396],[70.806821,38.486282],[71.348131,38.258905],[71.239404,37.953265],[71.541918,37.905774],[71.448693,37.065645],[71.844638,36.738171],[72.193041,36.948288],[72.63689,37.047558],[73.260056,37.495257],[73.948696,37.421566],[74.980002,37.41999],[75.158028,37.133031],[74.575893,37.020841],[74.067552,36.836176],[72.920025,36.720007],[71.846292,36.509942],[71.262348,36.074388],[71.498768,35.650563],[71.613076,35.153203],[71.115019,34.733126],[71.156773,34.348911],[70.881803,33.988856],[69.930543,34.02012],[70.323594,33.358533],[69.687147,33.105499],[69.262522,32.501944],[69.317764,31.901412],[68.926677,31.620189],[68.556932,31.71331],[67.792689,31.58293],[67.683394,31.303154],[66.938891,31.304911],[66.381458,30.738899],[66.346473,29.887943],[65.046862,29.472181],[64.350419,29.560031],[64.148002,29.340819],[63.550261,29.468331],[62.549857,29.318572],[60.874248,29.829239],[61.781222,30.73585],[61.699314,31.379506],[60.941945,31.548075],[60.863655,32.18292],[60.536078,32.981269],[60.9637,33.528832],[60.52843,33.676446],[60.803193,34.404102],[61.210817,35.650072]]]}"),  
        ("ALB", "{\"type\":\"Polygon\",\"coordinates\":[[[20.590247,41.855404],[20.463175,41.515089],[20.605182,41.086226],[21.02004,40.842727],[20.99999,40.580004],[20.674997,40.435],[20.615,40.110007],[20.150016,39.624998],[19.98,39.694993],[19.960002,39.915006],[19.406082,40.250773],[19.319059,40.72723],[19.40355,41.409566],[19.540027,41.719986],[19.371769,41.877548],[19.304486,42.195745],[19.738051,42.688247],[19.801613,42.500093],[20.0707,42.58863],[20.283755,42.32026],[20.52295,42.21787],[20.590247,41.855404]]]}"),
    ],# can continue with more countries  from https://raw.githubusercontent.com/johan/world.geo.json/34c96bba9c07d2ceb30696c599bb51a5b939b20f/countries.geo.json
    ["country", "geometry"]
)

Given the geometry column which is actually geojson, how can I calculate the area in square-meters using a good GIS approach? For example using the methods outlined in these questions:

Calculate Polygon area in planar units (e.g. square-meters) in Shapely

How do I get the area of a GeoJSON polygon with Python

How to calculate the area of a polygon on the earth's surface using python?

Andrew Andrade
  • 2,608
  • 1
  • 17
  • 24

1 Answers1

0

The way you can think about pandas_udfs is that you are writing your logic to be applied to a pandas series. This means that you would be applying an operation and it would automatically apply to every row.

If you want to develop this locally, you can actually take a much smaller sample of your data (like you did), and have it stored in a pandas series, and get it working there:

from shapely.geometry import Polygon
import json
from pyproj import Geod

#just select the column you want to use the pandas udf
pdf = df.select("geometry").toPandas()

#convert to pandas series
pdf_geom_raw = pdf.ix[:,0]

#how to apply converting string to json/dict
pdf_geom = pdf_geom_raw.apply(json.loads)

# function using non-spark functions
def get_area(shape):
    geod = Geod(ellps="WGS84")
    poly = Polygon(shape["coordinates"][0])
    area = abs(geod.geometry_area_perimeter(poly)[0])
    return area
pdf_geom = pdf_geom.apply(get_area)

Here you could just try it locally (without spark) by replacing pdf = df.select("geometry").toPandas() to pdf = pd.read_csv("geo.csv")

Now that you have it working locally, you can copy paste the code in your pandas_udf

from shapely.geometry import Polygon
import json
from pyproj import Geod
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)
def geodesic_polygon_area(pdf_geom): 
    pdf_geom = pdf_geom.apply(json.loads)
    def get_area(shape):
        geod = Geod(ellps="WGS84")
        poly = Polygon(shape["coordinates"][0])
        area = abs(geod.geometry_area_perimeter(poly)[0])
        return area

    pdf_geom = pdf_geom.apply(get_area)
    return pdf_geom

df =  df.withColumn('area_square_meters', geodesic_polygon_area(df.geometry))

When running the code:

>>> df.show()
+-------+--------------------+--------------------+
|country|            geometry|  area_square_meters|
+-------+--------------------+--------------------+
|    AFG|{"type":"Polygon"...|6.522700837770404E11|
|    ALB|{"type":"Polygon"...|2.969479517410540...|
+-------+--------------------+--------------------+
Andrew Andrade
  • 2,608
  • 1
  • 17
  • 24