Context
I have 500,000+ road objects for all the roads in the state of Illinois that have a Geoshape property for a line. I additionally have a set of objects for points across the state.
Need
I would like to add to the backing dataset of the points object type a column for the ID of the closest road to each point. Most roads are within 50m of the points so this fact could help optimize whatever method is chosen.
Previous attempts
I tried using the DataFrame.knn_join() method part of the geospatial-tools library native to Palantir. However, testing it shows that it apparently finding the closest line to a point doesn't work. Only finding the closest point to a point works. It also takes very long as well.
I also tried doing a DataFrame.distance_join() however that returns all the objects within a distance and I only want the closest. I guess I could get all the roads within 50 m of the points and then calculate the distance between each of the results and the point and find the minimum but that seems like overkill and it will exclude roads more than 50m away.
I finally thought of using another library instead of geospatial-tools to do what I want. I asked ChatGPT how I could do this and it came up with part of this code that uses GeoSpark:
from transforms.api import transform_df, Input, Output
from geospatial_tools import geospatial
from geospark.register import GeoSparkRegistrator
from geospark.core.spatialOperator import JoinQuery
@geospatial()
@transform_df(
Output("ri.foundry.main.dataset.46a58ef8-732f-4bad-9b19-8e3aab9f5d30"),
roads=Input("ri.foundry.main.dataset.32ea817c-1f13-4295-b0a1-345ca38e64d2"),
points=Input("ri.foundry.main.dataset.e0530819-d744-49ac-9e39-91bacd41d199")
)
def compute(ctx, roads, points):
GeoSparkRegistrator.registerAll(ctx.spark_session)
joined_df = JoinQuery.SpatialJoinQuery(points, roads, True, False)
return joined_df
However, when I ran this I got this error:
Java classpath reference error
A Python dependency you are using is attempting to reference a Java jar not in the classpath. Please check recently added Python dependencies, and add a dependency on the necessary Java packages (JARs) in the build.gradle file.
/transforms-python/src/myproject/datasets/nearest-road.py
GeoSparkRegistrator.registerAll(ctx.spark_session)
I'm not sure how to solve this.
Let me know another solution or how to fix this code!
You can do this in code repositories with the caveat that there's no convenient way to take advantage of distributed processing because Apache Sedona's KNN only works for one point at a time and the other solutions use in-memory tools (e.g. geopandas). There was a paper from 2021 about implementing a KNN join query in Sedona, which would be distributed in the normal way Spark jobs can be, and the code appears to exist on a fork of Sedona.
If future readers are more familiar with geopandas or some other geospatial package, that's probably a better approach for them than the code below. My solution doesn't scale to massive data (but many shapefiles aren't actually that big on disk). I tested this code on the Illinois Sinkholes shapefile and the Illinois Roads shapefile from TIGER.
Notes about this code:
from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.spatialOperator import KNNQuery
from sedona.core.enums import IndexType
from shapely import wkt
import logging
from pyspark.sql import Row
logger = logging.getLogger(__name__)
@geospatial()
@transform(
output_df=Output("<output_path>"),
points=Input("<input_points_shapefile_dataset>"),
roads=Input("<input_lines_shapefile_dataset>"),
)
def compute(ctx, points, roads, output_df):
SedonaRegistrator.registerAll(ctx.spark_session)
roads_rdd = ShapefileReader.readToGeometryRDD(
ctx.spark_session.sparkContext, roads.filesystem().hadoop_path
)
roads_rdd.analyze()
points_rdd = ShapefileReader.readToGeometryRDD(
ctx.spark_session.sparkContext, points.filesystem().hadoop_path
)
points_rdd.analyze()
roads_rdd.buildIndex(IndexType.RTREE, False)
points_df = Adapter.toDf(points_rdd, ctx.spark_session)
k = 1
using_index = True
points_list = points_df.collect() # noqa
nearest_roads = []
for point in points_list:
try:
nearest_road = (
KNNQuery.SpatialKnnQuery(
roads_rdd, point.asDict()["geometry"], k, using_index
)
.pop()
.getUserData()
)
except Exception as e:
logger.warn(e)
nearest_road = None
p_dict = point.asDict()
p_dict["nearest_road"] = nearest_road
p_dict["geometry"] = wkt.dumps(p_dict["geometry"])
nearest_roads.append(Row(**p_dict))
points_nearest_df = ctx.spark_session.createDataFrame(nearest_roads)
output_df.write_dataframe(points_nearest_df)