I hope to identify the peaks in a segment of data (selecting the top 7 points with the highest prominences), which are clearly visible to the naked eye. However, I am unable to successfully obtain the results using the find_peaks function.
The data is accessible in this gist.
Error Result: If I directly use find_peaks
:
find_peaks(series, prominence=np.max(series) * 0.1, distance=48)
and then select the top 7 points with the highest prominences, I end up with some undesired points.
Clumsy Method: I can first smooth the data:
percentile_80 = series.rolling(
window=61, center=True, min_periods=1
).apply(lambda x: np.percentile(x, 80))
smoothed_series = series - percentile_80
Then, use find_peaks(smoothed_series, prominence=np.max(smoothed_series) * 0.1, distance=48)
, and select the top 7 points with the highest prominences, which yields the expected results.
However, this approach is much slower.
Edit on 2025.1.9: Thanks mozway, this is a good method. And I found another way to speed up: first find all peaks, then compare peaks with neighboring peaks,find the prominence peaks with neighbor. is this a good method?
def find_significant_peaks(x, prominence_diff_ratio=0.1, initial_distance=3):
# Step 1: Get all candidate peaks and their properties
peaks, properties = find_peaks(
x, distance=initial_distance, prominence=np.max(x) * 0.01
)
if len(peaks) == 0:
return peaks, properties
# Get prominences of all peaks
prominences = properties["prominences"]
# Calculate prominence differences using vectorized operations
diffs = np.abs(np.subtract.outer(prominences, prominences))
threshold_values = prominences * prominence_diff_ratio
valid_peaks_mask = np.ones(len(peaks), dtype=bool)
# For each peak, check prominence difference with neighbors and local maximality
compare_num = 10
for i in range(len(peaks)):
# Get local window range
start_idx = max(0, i - compare_num)
end_idx = min(len(peaks), i + 1 + compare_num)
# Get prominence values within local window
local_prominences = prominences[start_idx:end_idx]
current_prominence = prominences[i]
# Condition 1: Check if it's a local maximum
if current_prominence < np.max(local_prominences):
valid_peaks_mask[i] = False
continue
# Condition 2: Get prominence differences with neighbors
neighbor_diffs = diffs[i, start_idx:end_idx]
neighbor_diffs = neighbor_diffs[neighbor_diffs != 0] # Remove self-difference
# Check if all neighboring differences are greater than threshold
if np.any(neighbor_diffs <= threshold_values[i]):
valid_peaks_mask[i] = False
# Filter valid peaks and properties
valid_peaks = peaks[valid_peaks_mask]
# Update all properties in the properties dictionary
valid_properties = {}
for key in properties:
valid_properties[key] = properties[key][valid_peaks_mask]
return valid_peaks, valid_properties
The issue with your approach is that you rely on the prominence, which is the local height of the peaks, and not a good fit with your type of data.
From your total dataset, it looks indeed clear to the naked eye that there are high "peaks" relative to the top of the large blue area, but this is no longer obvious once we consider the exact local data:
NB. the scale of the insets' Y-axis is the same.
Also, let's compute the prominence of all peaks (see how the middle peak has a much greater prominence):
As you can see, there are peaks everywhere and what you would define as a peak in the left inset is actually a relatively small peak compared to peaks that you would not want to detect in the right inset.
What you want is a peak that is higher than the surrounding peaks, and you want to fully ignore the baseline, thus your approach of using a smoothing function to get the local trend is good.
Since your issue seems to be about speed, you can greatly improve it by using the native rolling.quantile
over a custom rolling.apply
with np.percentile
:
from scipy.signal import find_peaks
percentile_80 = series.rolling(window=61, center=True, min_periods=1).quantile(0.8)
smoothed_series = series.sub(percentile_80).clip(lower=0)
peaks, peak_data = find_peaks(smoothed_series, prominence=np.max(smoothed_series) * 0.1, distance=48)
series.plot()
series.loc[smoothed_series.iloc[peaks].nlargest(7).index].plot(ls='', marker='o')
This runs in just a few milliseconds compared to more than one second for the custom apply
:
# series.rolling(window=61, center=True, min_periods=1).apply(lambda x: np.percentile(x, 80))
1.47 s ± 25 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# series.rolling(window=61, center=True, min_periods=1).quantile(0.8)
3.9 ms ± 56.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Output:
I also added a clip
step after smoothing to get the following intermediate: