Given two 2d arrays I want to calculate the cross-correlation similar as done in scipy.signal.correlate2d but instead of using a fill_value=0
I want to calculate the Pearson R from the data overlap of the two input arrays. This will lead to higher values further away from the center of the data.
If the data shape is rows*cols
and the correlation is calculated for a row lag of row_lag
and a col lag of col_lag
, then the data overlap is (rows-|row_lag|) * (cols-|col_lag|)
.
Of course, a simple approach would be to write two nested for loops to calculate the lagged data a_lagged = a[row_lag:, col_lag:], b_lagged = b [:-row_lag, :-col_lag]
and then calculate the Pearson R of a_lagged, b_lagged
.
However this implementation is very slow.
So my approach was to first use the scipy implementation and then adjust the result by a factor that accounts for the different data size caused by lagging the inputs.
def zscore(x):
return (x-np.mean(x))/np.std(x)
def corr2d_adjusted(a,b):
"""
calculates 2d cross-correlation for a,b
adjusting the value without padding but normalization to data overlap size
"""
assert a.shape==b.shape
assert a.ndim==2
# calculate data overlap for each lag used in correlation to normalize
rows,cols = a.shape
#2*rows-1, 2*cols-1 # total size of correlation result
# correlation is performed by these row & col lags (minimum 1 row and col)
row_lags = np.arange(-rows+1,rows)
col_lags = np.arange(-cols+1,cols)
#len(row_lags), len(col_lags)
# the data overlap size is calculated by substracting the lag from the number of rows/cols
rows_lagged = rows - np.abs(row_lags)
cols_lagged = cols - np.abs(col_lags)
data_size = rows_lagged.reshape((-1,1)) * cols_lagged.reshape((1,-1))
#data_size.shape # same as corr
# calculate correlation using scipy
a_ = zscore(a) # zscore both inputs
b_ = zscore(b)
corr = correlate2d(a_,b_)
corr_adj = corr/data_size # adjust result: divide by overlap data size used for correlation calculation
return corr_adj
However this does not perform a zscore calculation on each lag but only globally.
Note that if data_size
was a constant of a.size
(which is the case only in the center), then this would be equivalent to pearson correlation.
Is there an idea for a fast implementation?
It is working now thanks to @sidhartthhhhh idea. The corrected code see below:
import numpy as np
from scipy.signal import correlate2d
def fast_corr2d_pearson(a, b):
"""
Fast 2D Pearson cross-correlation of two arrays using convolution.
Output shape is (2*rows - 1, 2*cols - 1) like correlate2d with mode=full.
"""
assert a.shape == b.shape
a = a.astype(np.float64)
b = b.astype(np.float64)
rows, cols = a.shape
ones = np.ones_like(a) # These are used to calculate the number of overlapping elements at each lag
n = correlate2d(ones, ones) # number of overlapping bins for each offset
sum_a = correlate2d(a, ones) # sum of a in overlapping region
sum_b = correlate2d(ones, b) # sum of b in overlapping region
sum_ab = correlate2d(a, b)
sum_a2 = correlate2d(a**2, ones)
sum_b2 = correlate2d(ones, b**2)
numerator = sum_ab-sum_a*sum_b/n
s_a = sum_a2 - sum_a**2/n
s_b = sum_b2 - sum_b**2/n
denominator = np.sqrt(s_a*s_b)
with np.errstate(invalid='ignore', divide='ignore'):
corr = numerator / denominator
corr[np.isnan(corr)] = 0
return corr
Each step can be verified to be correct:
lag_row,lag_col=3,7 # any test case
row_idx, col_idx = rows-1+lag_row, cols-1+lag_col # 2d index in resulting corr matrix
a_lagged = a[lag_row:,lag_col:] # only works for lag_row>0, lag_col>0
sum_a[row_idx,col_idx] == np.sum(a_lagged)
Slow code for comparison:
from scipy.stats import pearsonr
rows,cols = data.shape
row_lags = range(-rows + 1, rows)
col_lags = range(-cols + 1, cols)
autocorr = np.empty((len(row_lags), len(col_lags)))
autocorr.shape
for lag_row in tqdm(row_lags):
for lag_col in col_lags:
# Create a lagged version of the data
# todo: implement logic for lag=0
if lag_row >= 0 and lag_col >= 0:
data_0 = data[lag_row:, lag_col:]
data_1 = data[:-lag_row, :-lag_col]
elif lag_row < 0 and lag_col < 0:
data_0 = data[:lag_row, :lag_col]
data_1 = data[-lag_row:, -lag_col:]
elif lag_row >= 0:
data_0 = data[lag_row:, :lag_col]
data_1 = data[:-lag_row, -lag_col:]
else:
data_0 = data[:lag_row, lag_col:]
data_1 = data[-lag_row:, :-lag_col]
try:
corr=pearsonr(data_0.flatten(),data_1.flatten()).statistic
except:
corr=np.nan
autocorr[lag_row+rows-1, lag_col+cols-1] = corr
This example is for autocorrelation but works the same.
Related discussion https://stackoverflow.com/a/51168178