We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.
Schema of data frame looks like:
root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)
I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:
station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")
Example input data looks like this:
-RECORD 0-------------------------------------
geo_name | Neckarkanal
geo_latitude | 49.486697
geo_longitude | 8.504944
geo_bst | 0
geo_bvw | 0
geometry_type | Polygon
geometry_polygon | 8.4937, 49.4892, ...
geometry_multipolygon | null
polygon | POLYGON ((8.4937 ...
The error occurs with just calling:
df.show()
The error:
java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring
To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:
dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()
Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?
Since you had a dataframe, pandas_udf
check should work very well. The function itself may not look very nice, but it works. In the below example, it can be seen that "geo_name" = X is invalid for a polygon, and in the output, the polygon for this row is not created.
Input:
df = spark_sedona.createDataFrame(
[('A', '-74, 40, -73, 39, -75, 38, -74, 40'),
('X', '-11'),
('Y', None),
('B', '-33, 50, -30, 38, -40, 27, -33, 50')],
['geo_name', 'geometry_polygon']
)
Script:
from pyspark.sql import functions as F
import pandas as pd
from shapely.geometry import Polygon
@F.pandas_udf('string')
def nullify_invalid_polygon(ser: pd.Series) -> pd.Series:
def nullify(s):
try:
p_shell = list(zip(*[iter(map(float, s.split(',')))]*2))
return s if Polygon(p_shell).is_valid and p_shell != [] else None
except (ValueError, AttributeError): pass
return ser.map(nullify)
df = df.withColumn('geometry_polygon', nullify_invalid_polygon('geometry_polygon'))
df.createOrReplaceTempView("station_gdf")
df = spark_sedona.sql("SELECT *, CASE WHEN isnull(geometry_polygon) THEN null ELSE ST_PolygonFromText(geometry_polygon, ',') END AS polygon FROM station_gdf")
Result:
df.printSchema()
# root
# |-- geo_name: string (nullable = true)
# |-- geometry_polygon: string (nullable = true)
# |-- polygon: geometry (nullable = true)
df.show(truncate=0)
# +--------+----------------------------------+------------------------------------------+
# |geo_name|geometry_polygon |polygon |
# +--------+----------------------------------+------------------------------------------+
# |A |-74, 40, -73, 39, -75, 38, -74, 40|POLYGON ((-74 40, -73 39, -75 38, -74 40))|
# |X |null |null |
# |Y |null |null |
# |B |-33, 50, -30, 38, -40, 27, -33, 50|POLYGON ((-33 50, -30 38, -40 27, -33 50))|
# +--------+----------------------------------+------------------------------------------+
The idea is to apply Polygon.is_valid
. But since in a few cases it throws errors instead of returning False
, it is put inside try...except
.