Suppose I have a numpy s
array of acceleration values representing some signal sampled at a fixed rate dt
. I want to compute the cumulative absolute velocity, i.e. np.trapz(np.abs(s), dx=dt)
.
This is great except if dt
is "large" (e.g. 0.01) and the signal s
is both long and crossing between positive and negative values frequently (an unfortunately common occurrence), an error is accumulated from the fact that taking |s|
drops the information about the original sign of s
. See the picture for a better idea of what this error actually looks like.
I have some custom code that can correctly account for this error by creating a modified trapezium rule with numba, but there are other very similar functions that I need to implement doing things like np.trapz(np.square(s), dx=dt)
. Is there an off the shelf solution for this sort of numerical integral that:
np.trapz(np.square(s), dx=dt)
and np.trapz(np.abs(s), dx=dt)
, etc...For the record, the following parallel numba code is what I am using to integrate the signals
@numba.njit(parallel=True)
def cav_integrate(
waveform: npt.NDArray[np.float32], dt: float
) -> npt.NDArray[np.float32]:
"""Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
cav = np.zeros((waveform.shape[0],), dtype=np.float32)
for i in range(waveform.shape[0]):
for j in range(waveform.shape[1] - 1):
if np.sign(waveform[i, j]) * np.sign(waveform[i, j + 1]) >= 0:
cav[i] += dt / 2 * (np.abs(waveform[i, j]) + np.abs(waveform[i, j + 1]))
else:
slope = (waveform[i, j + 1] - waveform[i, j]) / dt
x0 = -waveform[i, j] / slope
cav[i] += x0 / 2 * np.abs(waveform[i, j]) + (dt - x0) / 2 * np.abs(
waveform[i, j + 1]
)
return cav
I have uploaded a small broadband ground motion simulation to a dropbox link (approx. 91MiB) for testing. This data comes from a finite difference simulation of a recent earthquake near Wellington, New Zealand plus some empirically derived high-frequency noise. The file is an HDF5 containing some station data (irrelevant for our purposes), and simulation waveforms in the "waveforms" key. The array has shape (number of stations, timesteps, components) = (1433, 5876, 3). The 1d numpy array waveform[i, :, j]
is the simulated acceleration for the ith station in the jth component. We need to compute the cumulative absolute velocity (CAV) for each component and each station independently. The benchmark code to do this can be found below:
import time
import h5py
import numba
import numpy as np
import numpy.typing as npt
broadband_input_file = h5py.File("broadband.h5", "r")
# Load the entire dataset into memory so that the first method is not arbitrarily slowed down by file I/O
waveforms = np.array(broadband_input_file["waveforms"])
dt = 0.01
start = time.process_time()
cav_naive = np.trapz(np.abs(waveforms), dx=dt, axis=1)
print(f"CAV naive time: {time.process_time() - start}")
@numba.njit
def cav_integrate(
waveform: npt.NDArray[np.float32], dt: float
) -> npt.NDArray[np.float32]:
"""Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
for c in range(waveform.shape[-1]):
for i in range(waveform.shape[0]):
for j in range(waveform.shape[1] - 1):
if np.sign(waveform[i, j, c]) * np.sign(waveform[i, j + 1, c]) >= 0:
cav[i, c] += (
dt
/ 2
* (np.abs(waveform[i, j, c]) + np.abs(waveform[i, j + 1, c]))
)
else:
slope = (waveform[i, j + 1, c] - waveform[i, j, c]) / dt
x0 = -waveform[i, j, c] / slope
cav[i, c] += x0 / 2 * np.abs(waveform[i, j, c]) + (
dt - x0
) / 2 * np.abs(waveform[i, j + 1, c])
return cav
# Warm up the numba compilation cache
_ = cav_integrate(waveforms, dt)
start = time.process_time()
cav_bespoke = cav_integrate(waveforms, dt)
print(f"Custom CAV time: {time.process_time() - start}")
print(
f"Ratio naive CAV / custom CAV (0, 25, 50, 75, 100% quartiles): {np.percentile(cav_naive / cav_bespoke, [0, 25, 50, 75, 100])}"
)
Which gives the following output
CAV naive time: 0.14353649699999993
Custom CAV time: 0.11182449700000019
Ratio naive CAV / custom CAV (0, 25, 50, 75, 100% quartiles): [1.00607312 1.00999796 1.01163089 1.01318455 1.02221394]
These differences are reasonably small, better examples of larger differences are shown in the comments. Some of the observed waveforms have 20-40% differences between the methods. Even 2% differences might be important for some of the researchers I support. Note also that the CAV calculation is done on a single thread for comparison, but I would parallelise both methods in reality for the largest waveform arrays (having 6 or 7x the stations and 10-20x the timesteps depending on the temporal resolution of the simulation). Funnily enough the parallel overhead for this small file makes cav_integrate
slower than the naive approach if enabled.
We actually do the CAV calculation for all linear combinations cos(theta) * waveform[i, :, 0] + sin(theta) * waveform[i, :, 1]
where theta = 0, 1,...180°
to obtain orientation independent measurements of CAV. This is part of the reason it needs to be fast.
This answer focus more on the performance/vectorization aspects than numerical integration.
Is ideally vectorised so that integration can be done for tens of thousands of signals at once in a reasonable time?
Technically, Numba code can run with njit
(and without errors) are always vectorized based on the definition on Numpy (a vectorized function is basically a natively compiled function). However, it can be made faster. The first thing to do is to use multiple threads so the code can benefit from multiple CPU cores.
Funnily enough the parallel overhead for this small file makes cav_integrate slower than the naive approach if enabled.
This is because there is 2 issues:
process_time
returns the sum of the system and user CPU time (i.e. amount of parallel work) of the current process. Its does not measures the wall clock time. Thus, the benchmark is biased. You should use time()
to measure the wall clock time instead.prange
instead of range
or otherwise the loop will be sequential (so the code of the question is not actually parallel).To efficiently parallelize the code, we should swap the i
-based and c
-based loops.
Moreover, there are other things to consider when it comes to performance:
cav[i, c]
and accumulate values in a local variable instead (be careful to use it only within the parallel loop).dt
is a 64-bit number so any operation involving it will results in a 64-bit number.c
and j
axis (though this requires a different input layout).Here is the modified code considering all points except the last one (about SIMD and the memory layout):
@numba.njit(parallel=True)
def cav_integrate_opt(
waveform: npt.NDArray[np.float32], dt: float
) -> npt.NDArray[np.float32]:
"""Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
dtf = np.float32(dt)
half = np.float32(0.5)
for i in numba.prange(waveform.shape[0]):
for c in range(waveform.shape[-1]):
tmp = np.float32(0)
for j in range(waveform.shape[1] - 1):
v1 = waveform[i, j, c]
v2 = waveform[i, j + 1, c]
if min(v1, v2) >= 0 or max(v1, v2) <= 0:
tmp += dtf * (np.abs(v1) + np.abs(v2))
else:
inv_slope = dtf / (v2 - v1)
x0 = -v1 * inv_slope
tmp += x0 * np.abs(v1) + (dtf - x0) * np.abs(v2)
cav[i, c] = tmp * half
return cav
Here are performance results on my AMD Ryzen 5700U CPU (with 8 cores):
naive trapz (seq): 315 ms
initial cav_integrate (seq): 244 ms
optimized cav_integrate (par): 10 ms <-----
Th optimized implementation is 25 times faster than cav_integrate
and 31 times faster than the naive approach.
For better performance, please consider the last optimization point (more precisely about SIMD). That being said, this can be a bit complex to perform here. It might requires the else
branch to be rarely executed (i.e. <5%) so to be pretty efficient.
Can be reused for both integrals np.trapz(np.square(s), dx=dt) and np.trapz(np.abs(s), dx=dt), etc...
Here are some thoughts:
For np.trapz(np.abs(s), dx=dt)
, a solution consists in computing the minimum value of the signal, then subtract the minimum to the signal, compute np.trapz
of the resulting adapted signal so to finally correct the result. This solution is more efficient than you current one because it can benefit from SIMD instructions. However, it does not work for np.square
.
A generic solution is to add new points close to the problematic area (thanks to an interpolation function). This solution is not optimal because to increase the computational time and is not numerically exact either (though using a lot of point should give a pretty accurate solution). You do not need to interpolate all points nor to generate new array for the whole array : you can do that line by line or even on the fly (a bit more complicated). This can save a lot of RAM and computation time.
Another generic solution is to pass a generic function in parameter to the numba function for computing differently the case where the sign change. However, this solution should be significantly slower than your specialized solution because it does not benefit from SIMD instructions and add an expensive function call that can hardly be inlined.
You can mix the two last solution so to build a generic solution which should be still faster once running with multiple threads and optimized like above. The idea is to add just one point where the curve cross the line y=0 and and split the integration in two parts. A linear interpolation should give results similar to cav_integrate_opt
(if not even equal). Here is an example:
@numba.njit(parallel=True)
def cav_integrate_opt_generic(waveform, dt, fun):
cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
dtf = np.float32(dt)
half = np.float32(0.5)
for i in numba.prange(waveform.shape[0]):
for c in range(waveform.shape[-1]):
tmp = np.float32(0)
for j in range(waveform.shape[1] - 1):
v1 = waveform[i, j, c]
v2 = waveform[i, j + 1, c]
if min(v1, v2) < 0 and max(v1, v2) > 0:
# Basic linear interp
# Consider passing another generic function in
# parameter to find roots if needed (more expensive).
inv_slope = dtf / (v2 - v1)
x0 = -v1 * inv_slope
tmp += x0 * fun(v1) + (dtf - x0) * fun(v2)
else:
tmp += dtf * (fun(v1) + fun(v2))
cav[i, c] = tmp * half
return cav
# Needs to be wrapped in a Numba function for sake of performance
# (so Numba can call it directly like a native C function)
@numba.njit
def numba_abs(y):
return np.abs(y)
# Note `cav_integrate_opt_generic` is recompiled for each different provided function.
cav_bespoke = cav_integrate_opt_generic(waveforms, dt, numba_abs)
If you want to do that using a higher-order interpolation and integration then you certainly need to consider more points and a generic function to find roots (which is certainly much more expensive when it is even possible to find analytical solutions).
It turns out this more generic function is only 5~10% slower for np.abs
on my machine. Result are the same for np.abs
.