I want to detect all possible "straight" line combinations (as long as criteria are met (condition inside extend_line()
function: if x_sign == new_x_sign and y_sign == new_y_sign and np.sign(slope) == np.sign(new_slope) and abs(slope - new_slope) <= 2
)) out of my sample set of points. Therefore I tried a approach with nesting my line extention function extend_line()
, so the function runs as many times as it needs in order to combine all line segements meeting the criteria. To achive this I calculate the nearby points for every single point and afterwards check with line combinations work out.
The basic idea does work for the first "iteration" and the first "straigt forward" lineament meeting the criteria is found:
But as soon as the latest execution of nested extend_line()
is done, I am having the issue that I am not able to get rid of all line segments, which belong to the former line but do not belong to the current line (the brown and purple line segments should be removed:
I tried manipulating different varibales (like helper_list
) but this did not work and as soon as the nested function was executed and the variables were "reset" to the variables from the original extend_line()
function. I also tried truncating helper_list
, where I store the line segment so it matches the amount of visited points. This seemed to be pretty promising, but the way/idea I had, did not work as desired.
import geopandas as gpd
from shapely.geometry import Point, LineString, MultiLineString
from shapely.ops import linemerge
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
def calc_slope(line):
x1,y1 = line.coords[0]
x2,y2 = line.coords[1]
try:
s = (y2-y1)/(x2-x1)
except:
s = 0
return s
def extend_line(start_line, current_line, current_point, x_sign, y_sign, slope, points, visited_points, nearby_points_dict, helper_list=[]):
# Add current line to helper_list if first "iteration"
if len(visited_points) == 1:
helper_list.append(current_line)
# Add the current point to visited points
visited_points.append(tuple(current_point))
# Find nearby points that have not been visited yet
key = np.where(points == current_point)
nearby_points = [p for p in nearby_points_dict[key[0][0]] if tuple(p) not in visited_points]
# Iterate over nearby points to extend the line
for next_point in nearby_points:
# # Truncate list to match amount of visited points
# if len(visited_points) < len(helper_list):
# helper_list = helper_list[:len(visited_points)]
# Create a new line from current point to next point
new_line = LineString([current_point, next_point])
# Check for signs in order to extent only points "in front"
new_dx = current_point[0] - next_point[0]
new_x_sign = np.sign(new_dx)
new_dy = current_point[1] - next_point[1]
new_y_sign = np.sign(new_dy)
# Calculate slope of new line segment
new_slope = calc_slope(new_line)
# Check if conditions are met (same sign -> point in front, same lope sign -> same tilted line segment, slope witin threshold -> keeps "straight" line)
if x_sign == new_x_sign and y_sign == new_y_sign and np.sign(slope) == np.sign(new_slope) and abs(slope - new_slope) <= 2:
# Append valid line to the list of valid lines
helper_list.append(new_line)
# Recursively extend the line from the next point
extend_line(start_line, new_line, next_point, x_sign, y_sign, new_slope, points, visited_points.copy(), nearby_points_dict, helper_list)
if len(helper_list) > 1:
mls = MultiLineString(helper_list)
ls = linemerge(mls)
valid_lines.append(ls)
fig, ax = plt.subplots(figsize=(8, 8))
gdf.plot(ax=ax, marker='o', color='gray', markersize=50, label='Original Points')
try:
ax.plot(*ls.xy)
except:
for geom in mls.geoms:
ax.plot(*geom.xy)
plt.show()
# Sample data for GeoDataFrame
data = {
'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
'x': [0.0, 1.0, 3.0, 2.0, 5.0, 4.0, 7.0, 6.0, 9.0, 8.0,
1.5, 2.5, 4.5, 3.5, 6.5, 5.5, 8.5, 7.5, 10.0, 9.0,
3.0, 4.0, 6.0, 5.0, 8.0, 7.0, 10.0, 9.0, 11.0, 10.0],
'y': [0.0, 2.0, 1.0, 3.0, 0.5, 4.0, 2.5, 5.0, 1.5, 6.0,
3.5, 5.0, 2.0, 6.0, 4.5, 8.0, 3.0, 7.0, 5.5, 9.0,
7.0, 8.0, 6.0, 9.0, 8.5, 10.0, 7.5, 11.0, 9.0, 12.0]
}
# Create GeoDataFrame
geometry = [Point(xy) for xy in zip(data['x'], data['y'])]
gdf = gpd.GeoDataFrame(data, geometry=geometry)
gdf["id"] = gdf["id"] - 1
points = gdf[['x', 'y']].values
# Calculate distance matrix
pairwise_distances = cdist(points, points)
# Find indices of nearby points based on the threshold distance
nearby_indices = np.where((pairwise_distances > 1) & (pairwise_distances < 3))
# Exclude self-distances (where distance is zero)
nearby_indices = np.array([i for i in zip(*nearby_indices) if i[0] != i[1]])
# Create a list to store nearby points for each point
nearby_points_dict = {}
# Loop over each point to collect nearby points
for i in range(len(points)):
nearby_points = points[nearby_indices[nearby_indices[:,0]==i,1]]
nearby_points_dict[i] = nearby_points
all_lines = []
for i, nearby_points in nearby_points_dict.items():
start_point = points[i]
# Plot sample points and area to be considered nearby
fig, ax = plt.subplots(figsize=(8, 8))
gdf.plot(ax=ax, marker='o', color='gray', markersize=50, label='Original Points')
for k, v in gdf.iterrows():
ax.annotate(f'{v["id"]}\n({v["x"]},{v["y"]})',(v["x"],v["y"]), ha='center')
ax.plot(points[i,0],points[i,1], marker='o', color='blue', label='Current Point')
circle = plt.Circle(points[i], 3, color="green", alpha=0.25)
ax.add_patch(circle)
# Iterate over all nearby_points of start_point
for point in nearby_points:
end_point = point
# Check signs of points in order to only extend points "in front" of start_point
dx = start_point[0] - end_point[0]
x_sign = np.sign(dx)
dy = start_point[1] - end_point[1]
y_sign = np.sign(dy)
line = LineString([start_point, end_point])
ax.plot(point[0],point[1], marker='o', color='red', linestyle='', label='Nearby Points')
# Calculate slope of line in order to only extend with line segments within slope threshold
slope = calc_slope(line)
visited_points = [tuple(start_point)]
valid_lines = []
extend_line(line, line, end_point, x_sign, y_sign, slope, points, visited_points, nearby_points_dict)
all_lines.append(valid_lines)
break
Edit: Also wondering why I need to access key
inside extend_line()
function by using key[0][0]
if every point in points
is there only once? Why does key
printed out return (array([ 1, 1, 12], dtype=int64), array([0, 1, 1], dtype=int64))
for the first exectution of extent_line()
and not only eg (array([1,], dtype=int64)
?
If I understood your constraints correctly, this seems to do the trick with a recursive function:
import math
from functools import lru_cache
import matplotlib.pyplot as plt
import numpy as np
points = {
1: (0.0, 0.0),
2: (1.0, 2.0),
3: (3.0, 1.0),
4: (2.0, 3.0),
5: (5.0, 0.5),
6: (4.0, 4.0),
7: (7.0, 2.5),
8: (6.0, 5.0),
9: (9.0, 1.5),
10: (8.0, 6.0),
11: (1.5, 3.5),
12: (2.5, 5.0),
13: (4.5, 2.0),
14: (3.5, 6.0),
15: (6.5, 4.5),
16: (5.5, 8.0),
17: (8.5, 3.0),
18: (7.5, 7.0),
19: (10.0, 5.5),
20: (9.0, 9.0),
21: (3.0, 7.0),
22: (4.0, 8.0),
23: (6.0, 6.0),
24: (5.0, 9.0),
25: (8.0, 8.5),
26: (7.0, 10.0),
27: (10.0, 7.5),
28: (9.0, 11.0),
29: (11.0, 9.0),
30: (10.0, 12.0),
}
@lru_cache(maxsize=128)
def calc_slope(x1, y1, x2, y2):
try:
return (y2 - y1) / (x2 - x1)
except ZeroDivisionError:
return 0
def get_candidate_point_ids(line: list[int]):
skip_point_ids = set(line)
last_x, last_y = points[line[-1]]
if len(line) > 1: # Not the first segment?
last_slope = calc_slope(*points[line[-2]], last_x, last_y)
else:
last_slope = None
for point_id, (x, y) in points.items():
if point_id in skip_point_ids:
continue
dx = last_x - x
dy = last_y - y
if dx < 0 or dy < 0: # Only consider points in front
continue
if 1 < np.hypot(dx, dy) < 3: # Only consider points within 1..3 units
continue
if last_slope is not None:
# Verify things against the previous segment if needed
slope = calc_slope(last_x, last_y, x, y)
if np.sign(slope) != np.sign(last_slope):
continue
if abs(slope - last_slope) > 2:
continue
yield point_id
def get_lines(line):
for next_point_id in get_candidate_point_ids(line):
new_line = [*line, next_point_id]
yield new_line
yield from get_lines(new_line)
def main():
for start_point_id in points:
for line in get_lines([start_point_id]):
x, y = zip(*(points[point_id] for point_id in line))
plt.plot(x, y, marker="o", label=f"{line}")
plt.show()
main()
The output here is a bit messy since it's overlaying every possible polyline from all start points.