pythonresamplingdownsampling

Resample 1-D data in python


I have evenly spaced measurement-data from two different spectrometer and I would like to resample the data to fit to a standardized wavelength scale. The dataset from one spectrometer will be upsampled (from roughly 3 nm to 1 nm scale) the other one will be downsampled (from roughly 0.3 nm to 1 nm scale). Especially the latter one will be noisy so that I don't simply want to interpolate between the nearest neighbors while losing all data in between. While the upsampling is probably simply an interpolation, I find it surprisingly hard to find a function that solves this downsampling task.

This is how I imagine an ideal downsampling solution:

The theoretical spectrum with 1nm steps contains the number of photons collected with theoretical pixel boarders:

WL index [nm] left boarder of pixel [nm] right boarder of pixel [nm]
400 399.5 400.5
401 400.5 401.5
... ... ...

Whereas my real spectrometer collected photons with pixel-boarders:

WL index [nm] left boarder of pixel [nm] right boarder of pixel [nm]
400 399.85 400.15
400.3 400.15 400.45
400.6 400.45 400.75
400.9 400.75 401.05
401.2 400.05 401.35
401.5 401.35 401.65
... ... ...

To receive the theoretical 401 nm datapoint, I would take the average of the datapoints at 400.9 nm, 401.2nm, and additionally the datapoints at 400.6 and 401.5 nm weighted with of how much they overlap with my theoretical pixels (0.25/0.3 and 0.15/0.3 respectively). You could call this a moving window filter with trapezoidal shape. The trapezoidal shape accounts for the weighting of the datapoints that do not fully fall into the new pixel range.
Is there any such function with this or at least with similar behavior in a common python package?

Here's what I consider not applicable:

scipy.signal.resample assumes periodic data, which is not the case

scipy.signal.resample_poly takes integer-factor to resample, instead of target values

scipy.signal.decimate takes integer-factor to resample, instead of target values

pandas.Series.resample requires time as x-axis

scipy.interpolate.interp1d Interpolation does not take the average but requires the interpolation curve to go through every (noisy) datapoint, so it would not make use of the oversampled datapoints

I know that I could combine e.g. scipy.signal.decimate with factor 3 (which is not precisely correct) and then interpolate to the target wavelengths, but I feel that there should be a function for that that does this properly with non-integer factors all in one go. Maybe even for non-evenly spaced data, because you never know, what data my next spectrometer is going to output. Is there such a function?


Solution

  • Thanks for your input! I made my own downsampling function using averages with the trapezoidal weighting function explained in the question. Other options are probably quicker and most likely, there is not much difference in the output, but I think that the trapezoidal method is certainly "photon conservative", i.e. number of photons in the original data will be the number of photons in the downsampled data, which felt like the right thing to do to me.

    data weighting principle

    Here's the code:

    from math import floor, ceil
    import numpy as np
    from numpy.typing import NDArray
    import matplotlib.pyplot as plt
    
    def downsample(x_goal: NDArray, x_origin: NDArray, y_origin: NDArray) -> NDArray:
        """Downsampling using trapezoidal moving filter for monotonically increasing and equally spaced
        sample points.
    
        :param x_goal: The x-coordinates at which to evaluate the interpolated values. Must be equally 
            spaced and monotically increasing. 1D Array
        :param x_origin: The x-coordinates of the data points. Must be equally spaced and monotically 
            increasing. 1D Array
        :param y_origin: The y-coordinates of the data points, same length as `x_origin`.
        :return: Downsampled y-coordinates at the target sampling points, same lengths as `x_goal`
        """
        y_goal = np.zeros(x_goal.shape)
        dx_origin = (x_origin[-1] - x_origin[0]) / (len(x_origin) - 1)
        dx_goal = (x_goal[-1] - x_goal[0]) / (len(x_goal) - 1)
    
        # padding origin data to avoid edge effects
        missing_left = floor((x_origin[0] - (x_goal[0] - dx_goal/2 - dx_origin/2)) / dx_origin)
        missing_right = floor(((x_goal[-1] + dx_goal/2 + dx_origin/2) - x_origin[-1]) / dx_origin)
        end_values = [x_origin[0]-missing_left*dx_origin, x_origin[-1]+missing_right*dx_origin]
        x_origin = np.pad(x_origin, np.clip((missing_left, missing_right), 0, None), 
                          mode="linear_ramp", 
                          end_values=end_values)
        y_origin = np.pad(y_origin, np.clip((missing_left, missing_right), 0, None), mode="edge")
    
        for goal_id, x_g in enumerate(x_goal):
            leftmost_x = x_g - dx_goal/2 - dx_origin/2
            leftmost_full_x = x_g - dx_goal/2 + dx_origin/2
            leftmost_id = ceil((leftmost_x - x_origin[0]) / dx_origin)
            rightmost_x = x_g + dx_goal/2 + dx_origin/2
            rightmost_full_x = x_g + dx_goal/2 - dx_origin/2
            rightmost_id = ceil((rightmost_x - x_origin[0]) / dx_origin)
    
            relevant_x_origin = x_origin[leftmost_id:rightmost_id]
            weights = np.zeros(relevant_x_origin.shape)
            for id, x_o in enumerate(relevant_x_origin):
                if x_o <= leftmost_x:
                    weights[id] = 0.0
                elif leftmost_x < x_o < leftmost_full_x:
                    weights[id] = (x_o - x_g + dx_goal/2 + dx_origin/2) / dx_origin
                elif leftmost_full_x <= x_o <= rightmost_full_x:
                    weights[id] = 1.0
                elif rightmost_full_x < x_o < rightmost_x:
                    weights[id] = 1.0 - ((x_o - x_g - dx_goal/2 + dx_origin/2) / dx_origin)
                else:
                    weights[id] = 0.0
            y_goal[goal_id] = np.average(y_origin[leftmost_id:rightmost_id], weights = weights)
        return y_goal
    
    plt.figure("linear downsampling")
    x_origin = np.arange(399, 405, 0.3)
    y_origin = x_origin
    x_goal = np.arange(400, 404, 1.0)
    plt.scatter(x_origin, y_origin)
    plt.scatter(x_goal, downsample(x_goal, x_origin, y_origin))
    
    plt.figure("linear upsampling")
    x_origin = np.arange(399, 405, 1.0)
    y_origin = x_origin
    x_goal = np.arange(400, 404, 0.3)
    plt.scatter(x_origin, y_origin)
    plt.scatter(x_goal, np.interp(x_goal, x_origin, y_origin))
    
    plt.figure("noisy sin downsampling")
    x_origin = np.arange(300, 1200, 0.3)
    x_goal = np.arange(300, 1200, 1.0)
    y_origin = np.sin(x_origin/50)
    y_origin = y_origin + np.random.rand(len(y_origin))/2
    plt.scatter(x_origin, y_origin)
    plt.scatter(x_goal, downsample(x_goal, x_origin, y_origin))
    plt.show()