I have evenly spaced measurement-data from two different spectrometer and I would like to resample the data to fit to a standardized wavelength scale. The dataset from one spectrometer will be upsampled (from roughly 3 nm to 1 nm scale) the other one will be downsampled (from roughly 0.3 nm to 1 nm scale). Especially the latter one will be noisy so that I don't simply want to interpolate between the nearest neighbors while losing all data in between. While the upsampling is probably simply an interpolation, I find it surprisingly hard to find a function that solves this downsampling task.
This is how I imagine an ideal downsampling solution:
The theoretical spectrum with 1nm steps contains the number of photons collected with theoretical pixel boarders:
WL index [nm] | left boarder of pixel [nm] | right boarder of pixel [nm] |
---|---|---|
400 | 399.5 | 400.5 |
401 | 400.5 | 401.5 |
... | ... | ... |
Whereas my real spectrometer collected photons with pixel-boarders:
WL index [nm] | left boarder of pixel [nm] | right boarder of pixel [nm] |
---|---|---|
400 | 399.85 | 400.15 |
400.3 | 400.15 | 400.45 |
400.6 | 400.45 | 400.75 |
400.9 | 400.75 | 401.05 |
401.2 | 400.05 | 401.35 |
401.5 | 401.35 | 401.65 |
... | ... | ... |
To receive the theoretical 401 nm datapoint, I would take the average of the datapoints at 400.9 nm, 401.2nm, and additionally the datapoints at 400.6 and 401.5 nm weighted with of how much they overlap with my theoretical pixels (0.25/0.3 and 0.15/0.3 respectively). You could call this a moving window filter with trapezoidal shape. The trapezoidal shape accounts for the weighting of the datapoints that do not fully fall into the new pixel range.
Is there any such function with this or at least with similar behavior in a common python package?
Here's what I consider not applicable:
scipy.signal.resample assumes periodic data, which is not the case
scipy.signal.resample_poly takes integer-factor to resample, instead of target values
scipy.signal.decimate takes integer-factor to resample, instead of target values
pandas.Series.resample requires time as x-axis
scipy.interpolate.interp1d Interpolation does not take the average but requires the interpolation curve to go through every (noisy) datapoint, so it would not make use of the oversampled datapoints
I know that I could combine e.g. scipy.signal.decimate with factor 3 (which is not precisely correct) and then interpolate to the target wavelengths, but I feel that there should be a function for that that does this properly with non-integer factors all in one go. Maybe even for non-evenly spaced data, because you never know, what data my next spectrometer is going to output. Is there such a function?
Thanks for your input! I made my own downsampling function using averages with the trapezoidal weighting function explained in the question. Other options are probably quicker and most likely, there is not much difference in the output, but I think that the trapezoidal method is certainly "photon conservative", i.e. number of photons in the original data will be the number of photons in the downsampled data, which felt like the right thing to do to me.
Here's the code:
from math import floor, ceil
import numpy as np
from numpy.typing import NDArray
import matplotlib.pyplot as plt
def downsample(x_goal: NDArray, x_origin: NDArray, y_origin: NDArray) -> NDArray:
"""Downsampling using trapezoidal moving filter for monotonically increasing and equally spaced
sample points.
:param x_goal: The x-coordinates at which to evaluate the interpolated values. Must be equally
spaced and monotically increasing. 1D Array
:param x_origin: The x-coordinates of the data points. Must be equally spaced and monotically
increasing. 1D Array
:param y_origin: The y-coordinates of the data points, same length as `x_origin`.
:return: Downsampled y-coordinates at the target sampling points, same lengths as `x_goal`
"""
y_goal = np.zeros(x_goal.shape)
dx_origin = (x_origin[-1] - x_origin[0]) / (len(x_origin) - 1)
dx_goal = (x_goal[-1] - x_goal[0]) / (len(x_goal) - 1)
# padding origin data to avoid edge effects
missing_left = floor((x_origin[0] - (x_goal[0] - dx_goal/2 - dx_origin/2)) / dx_origin)
missing_right = floor(((x_goal[-1] + dx_goal/2 + dx_origin/2) - x_origin[-1]) / dx_origin)
end_values = [x_origin[0]-missing_left*dx_origin, x_origin[-1]+missing_right*dx_origin]
x_origin = np.pad(x_origin, np.clip((missing_left, missing_right), 0, None),
mode="linear_ramp",
end_values=end_values)
y_origin = np.pad(y_origin, np.clip((missing_left, missing_right), 0, None), mode="edge")
for goal_id, x_g in enumerate(x_goal):
leftmost_x = x_g - dx_goal/2 - dx_origin/2
leftmost_full_x = x_g - dx_goal/2 + dx_origin/2
leftmost_id = ceil((leftmost_x - x_origin[0]) / dx_origin)
rightmost_x = x_g + dx_goal/2 + dx_origin/2
rightmost_full_x = x_g + dx_goal/2 - dx_origin/2
rightmost_id = ceil((rightmost_x - x_origin[0]) / dx_origin)
relevant_x_origin = x_origin[leftmost_id:rightmost_id]
weights = np.zeros(relevant_x_origin.shape)
for id, x_o in enumerate(relevant_x_origin):
if x_o <= leftmost_x:
weights[id] = 0.0
elif leftmost_x < x_o < leftmost_full_x:
weights[id] = (x_o - x_g + dx_goal/2 + dx_origin/2) / dx_origin
elif leftmost_full_x <= x_o <= rightmost_full_x:
weights[id] = 1.0
elif rightmost_full_x < x_o < rightmost_x:
weights[id] = 1.0 - ((x_o - x_g - dx_goal/2 + dx_origin/2) / dx_origin)
else:
weights[id] = 0.0
y_goal[goal_id] = np.average(y_origin[leftmost_id:rightmost_id], weights = weights)
return y_goal
plt.figure("linear downsampling")
x_origin = np.arange(399, 405, 0.3)
y_origin = x_origin
x_goal = np.arange(400, 404, 1.0)
plt.scatter(x_origin, y_origin)
plt.scatter(x_goal, downsample(x_goal, x_origin, y_origin))
plt.figure("linear upsampling")
x_origin = np.arange(399, 405, 1.0)
y_origin = x_origin
x_goal = np.arange(400, 404, 0.3)
plt.scatter(x_origin, y_origin)
plt.scatter(x_goal, np.interp(x_goal, x_origin, y_origin))
plt.figure("noisy sin downsampling")
x_origin = np.arange(300, 1200, 0.3)
x_goal = np.arange(300, 1200, 1.0)
y_origin = np.sin(x_origin/50)
y_origin = y_origin + np.random.rand(len(y_origin))/2
plt.scatter(x_origin, y_origin)
plt.scatter(x_goal, downsample(x_goal, x_origin, y_origin))
plt.show()