pythonscipylinear-algebraspline

Unstable attempt at mean-preserving quadratic interpolation


I've been looking for an efficient mean-preserving calculation for quadratic interpolation. It doesn't seem to exist. So I tried a bare-bones basic system of linear equations in python and it's numerically unstable even if the input array is completely flat.

The model: every bin of data is of width 1. The value of the bin is identical to the integral of a quadratic polynomial over the bin, such that

Int(a_i+b_ix+c_ix^2,{dx,x-1/2,x+1/2})=data[x,i, x=i]=a_i+b_ix+c_i*(x^2+1/12).

In addition I set values to match between proximate pairs, so that

a_i+b_i*(x+1/2)+c_i*(x+1/2)^2=a_{i+1}+b_{i+1}*(x+1/2)+c_{i+1}*(x+1/2)^2

and so on for the slope.

The knots of the splines are tied at +-1/2, where the negative end of the first set is left dangling, and I define the boundary conditions on the last set. So for a dataset of length n, I've got 3n equations and 3n variables. np.linalg.solve() can take it!

The results are unstable for all input datasets, and immediately unstable for the input dataset [1,2,3]. The system of equations don't get anywhere close to being solved.

I expected integro_quadratic_spline_orig to return I after being given the output of integro_quadratic_spline(I). But alas, it comes nowhere near.

def integro_quadratic_spline(I):
    # Number of intervals
    n = len(I)

    # Initialize coefficients
    a = np.zeros(n)
    b = np.zeros(n)
    c = np.zeros(n)

    # Set up the system of equations
    A = sp.sparse.csr_array((3*n, 3*n))
    B = np.zeros(3*n)

    # Integral conditions


    #ALL CONDITIONS. First line solve, 2nd line continuity
    for i in range(0,n-1):
        A[3*(i),3*(i):3*(i)+6]=[1,(i),(i)**2+1/12,0,0,0] #condition that the integral over the bin matches the original bin value
        A[3*(i)+1,3*(i):3*(i)+6]=[0,1,2*(i+1/2),0,-1,-2*(i+1/2)] #condition that the slope at the right edge of the current bin matches the slope at the left edge of the next bin
        A[3*(i)+2,3*(i):3*(i)+6]=[1,i+1/2,(i+1/2)**2,-1,-(i+1/2),-(i+1/2)**2] #condition that the value at the right edge of the current bin matches the value at the left edge of the next bin
        B[3*(i)]=I[i] #Value of the integral
        B[3*(i)+1]=0 #matching condition,  a'-b'=0
        B[3*(i)+2]=0 #matching condition, a-b=0
    #below is same as above, but for trailing end.  
    i=i+1    
    A[3*(i),3*(i):3*(i)+3]=[1,(i),(i)**2+1/12]
    A[3*(i)+1,3*(i):3*(i)+3]=[0,1,2*(i+1/2)]
    A[3*(i)+2,3*(i):3*(i)+3]=[1,i+1/2,(i+1/2)**2]
    B[3*(i)]=I[i]
    B[3*(i)+1]=0
    B[3*(i)+2]=I[i]

    # Solve the system

    coeffs = sp.sparse.linalg.spsolve(A, B)
    print(np.allclose(np.dot(A.toarray(), coeffs), B))

    # Extract coefficients
    for i in range(n):
        a[i] = coeffs[3*i]
        b[i] = coeffs[3*i+1]
        c[i] = coeffs[3*i+2]

    return a, b, c


def integro_quadratic_spline_return(coeffs, mag):
    original_length = len(coeffs[0])
    new_length=original_length*mag
    newplot=np.empty(new_length)
    for i in range(original_length):
        for j in range(int(mag)):
            newplot[mag*i+j]=coeffs[0][i]+coeffs[1][i]*(i+j/mag-1/2)+coeffs[2][i]*(i+j/mag-1/2)**2

    return newplot

def integro_quadratic_spline_orig(coeffs):
    original_length = len(coeffs[0])
    newplot=np.empty(original_length)
    for i in range(original_length):
        newplot[i]=coeffs[0][i]+coeffs[1][i]*(i)+coeffs[2][i]*((i)**2-12)

    return newplot

Solution

  • Since you haven't answered any questions, here's a series of wild guesses; first I demonstrate with nonlinear methods:

    Nonlinear form

    import functools
    
    import numpy as np
    from matplotlib import pyplot as plt
    from scipy.optimize import minimize, NonlinearConstraint
    
    
    def split_params(params: np.ndarray) -> np.ndarray:
        return params.reshape((3, -1))
    
    
    def spline_evaluate(
        x: np.ndarray,
        a: np.ndarray,
        b: np.ndarray,
        c: np.ndarray,
    ) -> np.ndarray:
        return a*x**2 + b*x + c
    
    
    def spline_diff_evaluate(
        x: np.ndarray,
        a: np.ndarray,
        b: np.ndarray,
    ) -> np.ndarray:
        return 2*a*x + b
    
    
    def spline_integral_evaluate(
        x: np.ndarray,
        a: np.ndarray,
        b: np.ndarray,
        c: np.ndarray,
    ) -> np.ndarray:
        return a*x**3/3 + b*x**2/2 + c*x
    
    
    def spline_cost(
        params: np.ndarray,
    ) -> float:
        """
        Rudimentary cost based on total order-1 and order-2 coefficients ("prefer a flat curve")
        """
        a, b, c = split_params(params)
        return a.dot(a) + b.dot(b)
    
    
    def c0_continuity_residuals(
        params: np.ndarray,
        inner_bounds: np.ndarray,
    ) -> np.ndarray:
        a, b, c = split_params(params)
        y_left = spline_evaluate(
            x=inner_bounds, a=a[:-1], b=b[:-1], c=c[:-1],
        )
        y_right = spline_evaluate(
            x=inner_bounds, a=a[1:], b=b[1:], c=c[1:],
        )
        return y_left - y_right
    
    
    def c1_continuity_residuals(
        params: np.ndarray,
        inner_bounds: np.ndarray,
    ) -> np.ndarray:
        a, b, c = split_params(params)
        yd_left = spline_diff_evaluate(
            x=inner_bounds, a=a[:-1], b=b[:-1],
        )
        yd_right = spline_diff_evaluate(
            x=inner_bounds, a=a[1:], b=b[1:],
        )
        return yd_left - yd_right
    
    
    def get_means(
        params: np.ndarray,
        inner_bounds: np.ndarray,
    ) -> np.ndarray:
        a, b, c = params.reshape((3, -1))[:, 1:-1]
        x0 = inner_bounds[:-1]
        x1 = inner_bounds[1:]
        return (
            spline_integral_evaluate(x=x1, a=a, b=b, c=c) -
            spline_integral_evaluate(x=x0, a=a, b=b, c=c)
        )/(x1 - x0)
    
    
    def get_boundary_conditions(
        params: np.ndarray,
        x: np.ndarray,
    ) -> np.ndarray:
        a, b, c = split_params(params)
        return spline_evaluate(
            x=x[[0, -1]], a=a[[0, -1]], b=b[[0, -1]], c=c[[0, -1]],
        )
    
    
    def solve_spline(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        """
        Solve for spline sections, one per input point, where each section takes the form
        ax**2 + bx + c
        i.e. a quadratic spline with C1 continuity. https://en.wikipedia.org/wiki/Spline_(mathematics)
        This is not interpolation, because the splines do not intersect the input points; they preserve
        section means equal to the input points.
        """
    
        inner_bounds = 0.5*(x[:-1] + x[1:])
        a0 = np.zeros_like(y)
        b0 = np.zeros_like(y)
        c0 = y.copy()
        x0 = np.concatenate((a0, b0, c0))
        epsilon = 1e-12
    
        c0_continuity = NonlinearConstraint(
            fun=functools.partial(c0_continuity_residuals, inner_bounds=inner_bounds),
            lb=-epsilon, ub=epsilon,
        )
    
        c1_continuity = NonlinearConstraint(
            fun=functools.partial(c1_continuity_residuals, inner_bounds=inner_bounds),
            lb=-epsilon, ub=epsilon,
        )
    
        mean_preservation = NonlinearConstraint(
            fun=functools.partial(get_means, inner_bounds=inner_bounds),
            lb=y[1:-1] - epsilon, ub=y[1:-1] + epsilon,
        )
        boundary_conditions = NonlinearConstraint(
            fun=functools.partial(get_boundary_conditions, x=x),
            lb=y[[0, -1]] - epsilon, ub=y[[0, -1]] + epsilon,
        )
    
        result = minimize(
            fun=spline_cost, x0=x0,
            constraints=(
                c0_continuity, c1_continuity, mean_preservation, boundary_conditions,
            ),
        )
        if not result.success:
            raise ValueError(result.message)
        return split_params(result.x)
    
    
    def plot(
        x: np.ndarray,
        y: np.ndarray,
        a: np.ndarray,
        b: np.ndarray,
        c: np.ndarray,
    ) -> plt.Figure:
        fig, ax = plt.subplots()
        ax.scatter(x, y)
    
        xres = np.linspace(start=x[0], stop=0.5*(x[0] + x[1]), num=51)
        ax.plot(xres, spline_evaluate(xres, a[0], b[0], c[0]))
        xres = np.linspace(start=0.5*(x[-2] + x[-1]), stop=x[-1], num=51)
        ax.plot(xres, spline_evaluate(xres, a[-1], b[-1], c[-1]))
    
        for x0, x1, x2, ai, bi, ci in zip(
            x[:-2], x[1: -1], x[2:],
            a[1:-1], b[1:-1], c[1:-1],
        ):
            xleft = 0.5*(x0 + x1)
            xright = 0.5*(x1 + x2)
            xres = np.linspace(start=xleft, stop=xright, num=51)
            yres = spline_evaluate(xres, ai, bi, ci)
            ax.plot(xres, yres)
    
        return fig
    
    
    def demo() -> None:
        rand = np.random.default_rng(seed=0)
        x = np.arange(10)
        y = rand.uniform(low=-1, high=1, size=x.size)
        a, b, c = solve_spline(x, y)
        plot(x, y, a, b, c)
        plt.show()
    
    
    if __name__ == '__main__':
        demo()
    

    splines

    Linear form

    This is equivalent:

    import typing
    
    import numpy as np
    import scipy.sparse as sp
    from matplotlib import pyplot as plt
    from scipy.optimize import minimize, LinearConstraint
    
    
    def split_params(params: np.ndarray) -> np.ndarray:
        return params.reshape((3, -1))
    
    
    def spline_evaluate(
        x: np.ndarray,
        a: np.ndarray, b: np.ndarray, c: np.ndarray,
    ) -> np.ndarray:
        return a*x**2 + b*x + c
    
    
    def spline_cost(params: np.ndarray) -> float:
        """
        Rudimentary cost based on total order-1 and order-2 coefficients ("prefer a flat curve")
        """
        a, b, c = split_params(params)
        return a.dot(a) + b.dot(b)
    
    
    def generate_constraints(
        x: np.ndarray, y: np.ndarray,
    ) -> typing.Iterator[LinearConstraint]:
        n = x.size
        inner_bounds = 0.5*(x[:-1] + x[1:])
    
        # C0 continuity:
        # a0x**2 - a1x**2 + b0x - b1x + c0 - c1 == 0
        yield LinearConstraint(
            A=sp.diags_array(
                (
                    inner_bounds**2, -inner_bounds**2,
                    inner_bounds, -inner_bounds,
                    np.ones(n), -np.ones(n),
                ),
                offsets=(0, 1, n, n+1, 2*n, 2*n+1),
                shape=(n-1, 3*n),
            ),
            lb=0, ub=0,
        )
    
        # C1 continuity:
        # dy1/dx == dy2/dx
        # 2a0x - 2a1x + b0 - b1 == 0
        yield LinearConstraint(
            A=sp.diags_array(
                (
                    2*inner_bounds, -2*inner_bounds,
                    np.ones(n), -np.ones(n),
                ),
                offsets=(0, 1, n, n+1),
                shape=(n-1, 3*n),
            ),
            lb=0, ub=0,
        )
    
        # Mean preservation:
        # (int_x0x1 ax**2 + bx + c dx)/(x1 - x0) == y
        # ax1**3/3 - ax0**3/3 + bx1**2/2 - bx0**2/2 + cx1 - cx0 == y(x1 - x0)
        lbub = y[1:-1]*(inner_bounds[1:] - inner_bounds[:-1])
        yield LinearConstraint(
            A=sp.diags_array(
                (
                    (inner_bounds[1:]**3 - inner_bounds[:-1]**3)/3,
                    (inner_bounds[1:]**2 - inner_bounds[:-1]**2)/2,
                    inner_bounds[1:] - inner_bounds[:-1],
                ),
                offsets=(1, 1+n, 1+2*n),
                shape=(n-2, 3*n),
            ),
            lb=lbub, ub=lbub,
        )
    
        # Boundary conditions:
        # ax**2 + bx + c == y
        yield LinearConstraint(
            A=sp.coo_array(
                (
                    (   # data
                        x[ 0]**2, x[ 0], 1,
                        x[-1]**2, x[-1], 1,
                    ),
                    (   # coordinates
                        (   # y
                            0, 0, 0,
                            1, 1, 1,
                        ),
                        (   # x
                            0,     n,       2*n,
                               n-1,   2*n-1,    3*n-1,
                        ),
                    ),
                ),
            ),
            lb=y[[0, -1]], ub=y[[0, -1]],
        )
    
    
    def solve_spline(x: np.ndarray, y: np.ndarray) -> np.ndarray:
        """
        Solve for spline sections, one per input point, where each section takes the form
        ax**2 + bx + c
        i.e. a quadratic spline with C1 continuity. https://en.wikipedia.org/wiki/Spline_(mathematics)
        This is not interpolation, because the splines do not intersect the input points; they preserve
        section means equal to the input points.
        """
        a0 = np.zeros_like(y)
        b0 = np.zeros_like(y)
        c0 = y.copy()
        x0 = np.concatenate((a0, b0, c0))
    
        result = minimize(
            fun=spline_cost, x0=x0,
            constraints=generate_constraints(x=x, y=y),
        )
        if not result.success:
            raise ValueError(result.message)
        return split_params(result.x)
    
    
    def plot(
        x: np.ndarray, y: np.ndarray,
        a: np.ndarray, b: np.ndarray, c: np.ndarray,
    ) -> plt.Figure:
        fig, ax = plt.subplots()
        ax.scatter(x, y)
    
        xres = np.linspace(start=x[0], stop=0.5*(x[0] + x[1]), num=51)
        ax.plot(xres, spline_evaluate(xres, a[0], b[0], c[0]))
        xres = np.linspace(start=0.5*(x[-2] + x[-1]), stop=x[-1], num=51)
        ax.plot(xres, spline_evaluate(xres, a[-1], b[-1], c[-1]))
    
        for x0, x1, x2, ai, bi, ci in zip(
            x[:-2], x[1: -1], x[2:],
            a[1: -1], b[1: -1], c[1: -1],
        ):
            xleft = 0.5*(x0 + x1)
            xright = 0.5*(x1 + x2)
            xres = np.linspace(start=xleft, stop=xright, num=51)
            yres = spline_evaluate(xres, ai, bi, ci)
            ax.plot(xres, yres)
    
        return fig
    
    
    def demo() -> None:
        rand = np.random.default_rng(seed=0)
        x = np.arange(10)
        y = rand.uniform(low=-1, high=1, size=x.size)
        a, b, c = solve_spline(x, y)
        plot(x, y, a, b, c)
        plt.show()
    
    
    if __name__ == '__main__':
        demo()