apache-sparkvalidationpysparkpolygonapache-sedona

Remove rows with invalid polygon values in a PySpark data frame?


We are using a PySpark function on a data frame which throws us an error. The error is most likely due to a faulty row in the data frame.

Schema of data frame looks like:

root
|-- geo_name: string (nullable = true)
|-- geo_latitude: double (nullable = true)
|-- geo_longitude: double (nullable = true)
|-- geo_bst: integer (nullable = true)
|-- geo_bvw: integer (nullable = true)
|-- geometry_type: string (nullable = true)
|-- geometry_polygon: string (nullable = true)
|-- geometry_multipolygon: string (nullable = true)
|-- polygon: geometry (nullable = false)

I have converted the column "geometry_polygon" in CSV to the geometry type column "polygon" like this:

station_groups_gdf.createOrReplaceTempView("station_gdf")
spatial_station_groups_gdf = spark_sedona.sql("SELECT *, ST_PolygonFromText(station_gdf.geometry_polygon, ',') AS polygon FROM station_gdf")

Example input data looks like this:

-RECORD 0-------------------------------------
geo_name              | Neckarkanal         
 geo_latitude          | 49.486697           
 geo_longitude         | 8.504944            
 geo_bst               | 0                   
 geo_bvw               | 0                   
 geometry_type         | Polygon             
 geometry_polygon      | 8.4937, 49.4892, ...
 geometry_multipolygon | null                
 polygon               | POLYGON ((8.4937 ...  

The error occurs with just calling:

df.show()

The error:

java.lang.IllegalArgumentException: Points of LinearRing do not form a closed linestring

enter image description here

To pinpoint these rows, we would like to iterate trough the data frame and apply a function to delete invalid values. Something like this:

dataframe.where(dataframe.polygon == valid).show()
dataframe.filter(dataframe.polygon == valid).show()

Do you know the best way to iterate row by row & deleting invalid values without in any way catching the PySpark data frame in its entirety (resulting in the error message and aborting the job)?


Solution

  • Since you had a dataframe, pandas_udf check should work very well. The function itself may not look very nice, but it works. In the below example, it can be seen that "geo_name" = X is invalid for a polygon, and in the output, the polygon for this row is not created.

    Input:

    df = spark_sedona.createDataFrame(
        [('A', '-74, 40, -73, 39, -75, 38, -74, 40'),
         ('X', '-11'),
         ('Y', None),
         ('B', '-33, 50, -30, 38, -40, 27, -33, 50')],
        ['geo_name', 'geometry_polygon']
    )
    

    Script:

    from pyspark.sql import functions as F
    import pandas as pd
    from shapely.geometry import Polygon
    
    @F.pandas_udf('string')
    def nullify_invalid_polygon(ser: pd.Series) -> pd.Series:
        def nullify(s):
            try:
                p_shell = list(zip(*[iter(map(float, s.split(',')))]*2))
                return s if Polygon(p_shell).is_valid and p_shell != [] else None
            except (ValueError, AttributeError): pass
        return ser.map(nullify)
    
    df = df.withColumn('geometry_polygon', nullify_invalid_polygon('geometry_polygon'))
    
    df.createOrReplaceTempView("station_gdf")
    df = spark_sedona.sql("SELECT *, CASE WHEN isnull(geometry_polygon) THEN null ELSE ST_PolygonFromText(geometry_polygon, ',') END AS polygon FROM station_gdf")
    

    Result:

    df.printSchema()
    # root
    #  |-- geo_name: string (nullable = true)
    #  |-- geometry_polygon: string (nullable = true)
    #  |-- polygon: geometry (nullable = true)
    
    df.show(truncate=0)
    # +--------+----------------------------------+------------------------------------------+
    # |geo_name|geometry_polygon                  |polygon                                   |
    # +--------+----------------------------------+------------------------------------------+
    # |A       |-74, 40, -73, 39, -75, 38, -74, 40|POLYGON ((-74 40, -73 39, -75 38, -74 40))|
    # |X       |null                              |null                                      |
    # |Y       |null                              |null                                      |
    # |B       |-33, 50, -30, 38, -40, 27, -33, 50|POLYGON ((-33 50, -30 38, -40 27, -33 50))|
    # +--------+----------------------------------+------------------------------------------+
    

    The idea is to apply Polygon.is_valid. But since in a few cases it throws errors instead of returning False, it is put inside try...except.