1

We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.

Schema of data frame looks like:

root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)

I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:

station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")

Example input data looks like this:

-RECORD 0-------------------------------------
geo_name              | Neckarkanal         
 geo_latitude          | 49.486697           
 geo_longitude         | 8.504944            
 geo_bst               | 0                   
 geo_bvw               | 0                   
 geometry_type         | Polygon             
 geometry_polygon      | 8.4937, 49.4892, ...
 geometry_multipolygon | null                
 polygon               | POLYGON ((8.4937 ...  

The error occurs with just calling:

df.show()

The error:

java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring

enter image description here

To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:

dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()

Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?

ZygD
  • 22,092
  • 39
  • 79
  • 102
pi_janes
  • 63
  • 5

1 Answers1

1

Since you had a dataframe, pandas_udf check should work very well. The function itself may not look very nice, but it works. In the below example, it can be seen that "geo_name" = X is invalid for a polygon, and in the output, the polygon for this row is not created.

Input:

df = spark_sedona.createDataFrame(
    [('A', '-74, 40, -73, 39, -75, 38, -74, 40'),
     ('X', '-11'),
     ('Y', None),
     ('B', '-33, 50, -30, 38, -40, 27, -33, 50')],
    ['geo_name', 'geometry_polygon']
)

Script:

from pyspark.sql import functions as F
import pandas as pd
from shapely.geometry import Polygon

@F.pandas_udf('string')
def nullify_invalid_polygon(ser: pd.Series) -> pd.Series:
    def nullify(s):
        try:
            p_shell = list(zip(*[iter(map(float, s.split(',')))]*2))
            return s if Polygon(p_shell).is_valid and p_shell != [] else None
        except (ValueError, AttributeError): pass
    return ser.map(nullify)

df = df.withColumn('geometry_polygon', nullify_invalid_polygon('geometry_polygon'))

df.createOrReplaceTempView("station_gdf")
df = spark_sedona.sql("SELECT *, CASE WHEN isnull(geometry_polygon) THEN null ELSE ST_PolygonFromText(geometry_polygon, ',') END AS polygon FROM station_gdf")

Result:

df.printSchema()
# root
#  |-- geo_name: string (nullable = true)
#  |-- geometry_polygon: string (nullable = true)
#  |-- polygon: geometry (nullable = true)

df.show(truncate=0)
# +--------+----------------------------------+------------------------------------------+
# |geo_name|geometry_polygon                  |polygon                                   |
# +--------+----------------------------------+------------------------------------------+
# |A       |-74, 40, -73, 39, -75, 38, -74, 40|POLYGON ((-74 40, -73 39, -75 38, -74 40))|
# |X       |null                              |null                                      |
# |Y       |null                              |null                                      |
# |B       |-33, 50, -30, 38, -40, 27, -33, 50|POLYGON ((-33 50, -30 38, -40 27, -33 50))|
# +--------+----------------------------------+------------------------------------------+

The idea is to apply Polygon.is_valid. But since in a few cases it throws errors instead of returning False, it is put inside try...except.

ZygD
  • 22,092
  • 39
  • 79
  • 102
  • Implemented your solution on my data frame and it worked very well! Thanks a lot! Still trying to wrap my head around your solution though. – pi_janes Oct 27 '22 at 15:47
  • I've replaced the previous version with a simpler one. You may prefer this one. – ZygD Oct 28 '22 at 18:24