We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.
Schema of data frame looks like:
root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)
I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:
station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")
Example input data looks like this:
-RECORD 0-------------------------------------
geo_name | Neckarkanal
geo_latitude | 49.486697
geo_longitude | 8.504944
geo_bst | 0
geo_bvw | 0
geometry_type | Polygon
geometry_polygon | 8.4937, 49.4892, ...
geometry_multipolygon | null
polygon | POLYGON ((8.4937 ...
The error occurs with just calling:
df.show()
The error:
java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring
To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:
dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()
Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?