I'm currently trying to implement my own version of 2D Pooling (with channels included) in Python using only NumPy, but I've run into a bit of a roadblock with vectorizing the backpropagation process. The operation I want to do is just to add each of the windows from an as_strided
view back into the original location from where it was taken.
The windowed view has 5 dimensions, with the shape being (channels, height, width, kernel height, kernel width). The problem that I am facing is that these windows can overlap depending on the stride value, so other solutions that I have seen can cause incorrect values in the finished gradients.
Here is the class that I made that includes both the forward and backward passes:
import numpy as np
def maxPool(partition: np.ndarray, backwards: bool = False):
max_val = np.max(partition)
if backwards:
mask = np.zeros(partition.shape, dtype=partition.dtype)
mask[partition.argmax()] = 1
return mask
return max_val
class Pooling2D():
def __init__(self, pool_funct: callable, kernel_size: tuple[int, int],
input_size: tuple[int, int, int], strides: tuple[int, int] = (1, 1), padding: int = 0) -> None:
self.funct = pool_funct
self.in_size = input_size
self.out_size = (input_size[0],
(input_size[1] - kernel_size[0] + 2*padding) // strides[0] + 1,
(input_size[2] - kernel_size[1] + 2*padding) // strides[1] + 1)
self.kernel_height, self.kernel_width = kernel_size
self.window_shape = (*self.out_size,
self.kernel_height,
self.kernel_width)
self.stride_h, self.stride_w = strides
self.padding = padding
def forward(self, x: np.ndarray) -> np.ndarray:
x_padded = np.pad(x, pad_width=((0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant')
strides = (x_padded.strides[0],
self.stride_h * x_padded.strides[1],
self.stride_w * x_padded.strides[2],
x_padded.strides[1],
x_padded.strides[2])
#Split into windows, and apply the pooling function to each window.
A_s = np.lib.stride_tricks.as_strided(x_padded, shape=self.window_shape, strides=strides)
pool_val = np.apply_along_axis(self.funct, -1, A_s.reshape(*self.out_size, -1))
self.last_in = x
self.last_out = pool_val
return pool_val
def backward(self, dx: np.ndarray) -> np.ndarray:
x_padded = np.pad(self.last_in, ((0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode="constant")
strides = (x_padded.strides[0],
self.stride_h * x_padded.strides[1],
self.stride_w * x_padded.strides[2],
x_padded.strides[1],
x_padded.strides[2])
#Window frames for previous input / Mask creation
main_windows = np.lib.stride_tricks.as_strided(x_padded, self.window_shape, strides, writeable=False)
mask = np.apply_along_axis(self.funct, -1, main_windows.reshape(*self.out_size, -1), backwards=True).reshape(main_windows.shape)
#Use mask to distribute the gradient into the mask, reshaped into (channels, kernel height, kernel width, num of windows)
pre_grad = np.einsum("ghw,ghwxy->ghwxy", dx, mask).transpose(0, 3, 4, 1, 2)
pre_grad = pre_grad.reshape(*pre_grad.shape[:3], -1)
# Zero array of original size (channels, in height, in width)
final_grad = np.zeros_like(x_padded)
# _____________________________________________
# - - - - - THE PART TO BE VECTORIZED - - - - -
# _____________________________________________
right = 0
down = 0
for i in range(pre_grad.shape[3]):
if right+self.kernel_width > x_padded.shape[2]:
right = 0
down += self.stride_h
final_grad[:, down:down+self.kernel_height,
right:right+self.kernel_width] += pre_grad[:, :, :, i]
right += self.stride_w
return final_grad[:, self.padding:-self.padding, self.padding:-self.padding] if self.padding > 0 else final_grad
My main question is, can I do the last part of the backward pass without any for loops? I wanted to reach a fully vectorized solution but wasn't able to find one that worked well with varying strides, padding, and channels yet.
An example solution for this problem would be:
pool = Pooling2D(function=maxPool, kernel_size=(3, 3), input_size=(2, 6, 6), strides=(1, 1), padding=0)
A = np.random.randint(3, 10, pool.in_size)
B = np.random.randint(3, 10, pool.out_size)
# Forward pass, should result in a (2, 4, 4) array
result = pool.forward(A)
print(result.shape)
# Backward pass, should result in a (2, 6, 6) array
grad = pool.backward(result - B)
print(grad)
# Where the output should look something like this:
# (2, 4, 4)
# [[[ 0 0 0 0 10 0]
# [ 0 0 0 14 0 0]
# [ 1 0 0 0 0 3]
# [ 0 2 0 4 0 0]
# [ 0 0 0 0 0 0]
# [ 0 4 0 0 1 0]]
#
# [[ 0 0 0 0 0 0]
# [ 0 0 0 14 0 0]
# [ 0 8 0 0 0 0]
# [ 0 0 7 0 0 0]
# [ 0 0 0 9 0 0]
# [ 0 0 0 0 0 0]]]
I also attempted to do a secondary as_strided
operation using the shape of x
and a reconstructed strides array using x
's shape and item-size, but no matter what I tried it would sometimes place the values of the gradient in the incorrect spots and didn't allow for any sort of overlap.
Here's an example of what I tried before that didn't work properly:
# Get the strides from the original array
_, Hs, Ws = x.strides
new_strides = (Hs*Ws*pre_grad.itemsize, Ws*pre_grad.itemsize, pre_grad.itemsize)
# Do a secondary as_strided
final_grad = np.lib.stride_tricks.as_strided(pre_grad, x.shape, new_strides)
But with this solution, if I have something like a (1, 1) stride and a (3, 3) kernel size, then it doesn't add any overlapped values together and it also does not put them in the correct locations.
I had overlooked the fact that as_strided
creates a view of the original array rather than a new copy, so the solution in this case was to just add the grad windows into an empty windowed array and then return the source of the previous empty array.
With help from @user2357112, I found that the best replacement solution is:
#Window frames for previous input / Mask creation
main_windows = np.lib.stride_tricks.as_strided(x_padded, self.window_shape, strides, writeable=False)
mask = np.apply_along_axis(self.funct, -1, main_windows.reshape(*self.out_size, -1), backwards=True).reshape(main_windows.shape)
#Use mask to distribute the gradient into the mask, reshaped into (channels, kernel height, kernel width, num of windows)
pre_grad = np.einsum("ghw,ghwxy->ghwxy", dx, mask)
# Zero array of original size (channels, in height, in width)
final_grad = np.zeros_like(x_padded)
final_windows = np.lib.stride_tricks.as_strided(final_grad, self.window_shape, strides, writeable=True)
np.add.at(final_windows, (slice(None)), pre_grad)
And this should properly add all of the windows back into the appropriate spots including overlap.