pythonnumpymachine-learningnumpy-ndarray

NumPy Stride Tricks: Is it possible to add the windows back into the original array size at the same location without for loops?


I'm currently trying to implement my own version of 2D Pooling (with channels included) in Python using only NumPy, but I've run into a bit of a roadblock with vectorizing the backpropagation process. The operation I want to do is just to add each of the windows from an as_strided view back into the original location from where it was taken.

The windowed view has 5 dimensions, with the shape being (channels, height, width, kernel height, kernel width). The problem that I am facing is that these windows can overlap depending on the stride value, so other solutions that I have seen can cause incorrect values in the finished gradients.

Here is the class that I made that includes both the forward and backward passes:

import numpy as np

def maxPool(partition: np.ndarray, backwards: bool = False):
        max_val = np.max(partition)
        if backwards:
            mask = np.zeros(partition.shape, dtype=partition.dtype)
            mask[partition.argmax()] = 1
            return mask
        return max_val

class Pooling2D():
    def __init__(self, pool_funct: callable, kernel_size: tuple[int, int], 
                 input_size: tuple[int, int, int], strides: tuple[int, int] = (1, 1), padding: int = 0) -> None:
        self.funct = pool_funct
        self.in_size = input_size
        self.out_size = (input_size[0],
                        (input_size[1] - kernel_size[0] + 2*padding) // strides[0] + 1, 
                        (input_size[2] - kernel_size[1] + 2*padding) // strides[1] + 1)
        self.kernel_height, self.kernel_width = kernel_size
        self.window_shape = (*self.out_size, 
                             self.kernel_height, 
                             self.kernel_width)
        self.stride_h, self.stride_w = strides
        self.padding = padding

    def forward(self, x: np.ndarray) -> np.ndarray:
        x_padded = np.pad(x, pad_width=((0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant')
        strides = (x_padded.strides[0], 
                   self.stride_h * x_padded.strides[1], 
                   self.stride_w * x_padded.strides[2], 
                   x_padded.strides[1], 
                   x_padded.strides[2])

        #Split into windows, and apply the pooling function to each window.
        A_s = np.lib.stride_tricks.as_strided(x_padded, shape=self.window_shape, strides=strides)
        pool_val =  np.apply_along_axis(self.funct, -1, A_s.reshape(*self.out_size, -1))

        self.last_in = x
        self.last_out = pool_val
        return pool_val
    
    def backward(self, dx: np.ndarray) -> np.ndarray:
        x_padded = np.pad(self.last_in, ((0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode="constant")
        strides = (x_padded.strides[0], 
                   self.stride_h * x_padded.strides[1], 
                   self.stride_w * x_padded.strides[2], 
                   x_padded.strides[1], 
                   x_padded.strides[2])
        
        #Window frames for previous input / Mask creation
        main_windows = np.lib.stride_tricks.as_strided(x_padded, self.window_shape, strides, writeable=False)
        mask = np.apply_along_axis(self.funct, -1, main_windows.reshape(*self.out_size, -1), backwards=True).reshape(main_windows.shape)

        #Use mask to distribute the gradient into the mask, reshaped into (channels, kernel height, kernel width, num of windows)
        pre_grad = np.einsum("ghw,ghwxy->ghwxy", dx, mask).transpose(0, 3, 4, 1, 2)
        pre_grad = pre_grad.reshape(*pre_grad.shape[:3], -1) 

        # Zero array of original size (channels, in height, in width)
        final_grad = np.zeros_like(x_padded)

        # _____________________________________________
        # - - - - - THE PART TO BE VECTORIZED - - - - -
        # _____________________________________________
        right = 0
        down = 0
        for i in range(pre_grad.shape[3]):
            if right+self.kernel_width > x_padded.shape[2]:
                right = 0
                down += self.stride_h
            final_grad[:, down:down+self.kernel_height, 
                          right:right+self.kernel_width] += pre_grad[:, :, :, i]
            right += self.stride_w
        
        return final_grad[:, self.padding:-self.padding, self.padding:-self.padding] if self.padding > 0 else final_grad

My main question is, can I do the last part of the backward pass without any for loops? I wanted to reach a fully vectorized solution but wasn't able to find one that worked well with varying strides, padding, and channels yet.

An example solution for this problem would be:

pool = Pooling2D(function=maxPool, kernel_size=(3, 3), input_size=(2, 6, 6), strides=(1, 1), padding=0)

A = np.random.randint(3, 10, pool.in_size)
B = np.random.randint(3, 10, pool.out_size)

# Forward pass, should result in a (2, 4, 4) array
result = pool.forward(A)
print(result.shape)

# Backward pass, should result in a (2, 6, 6) array
grad = pool.backward(result - B)
print(grad)

# Where the output should look something like this:
# (2, 4, 4)
# [[[ 0  0  0  0 10  0]
#   [ 0  0  0 14  0  0]
#   [ 1  0  0  0  0  3]
#   [ 0  2  0  4  0  0]
#   [ 0  0  0  0  0  0]
#   [ 0  4  0  0  1  0]]
#
#  [[ 0  0  0  0  0  0]
#   [ 0  0  0 14  0  0]
#   [ 0  8  0  0  0  0]
#   [ 0  0  7  0  0  0]
#   [ 0  0  0  9  0  0]
#   [ 0  0  0  0  0  0]]]

I also attempted to do a secondary as_strided operation using the shape of x and a reconstructed strides array using x's shape and item-size, but no matter what I tried it would sometimes place the values of the gradient in the incorrect spots and didn't allow for any sort of overlap.

Here's an example of what I tried before that didn't work properly:

# Get the strides from the original array
_, Hs, Ws = x.strides
new_strides = (Hs*Ws*pre_grad.itemsize, Ws*pre_grad.itemsize, pre_grad.itemsize)

# Do a secondary as_strided
final_grad = np.lib.stride_tricks.as_strided(pre_grad, x.shape, new_strides)

But with this solution, if I have something like a (1, 1) stride and a (3, 3) kernel size, then it doesn't add any overlapped values together and it also does not put them in the correct locations.


Solution

  • I had overlooked the fact that as_strided creates a view of the original array rather than a new copy, so the solution in this case was to just add the grad windows into an empty windowed array and then return the source of the previous empty array.

    With help from @user2357112, I found that the best replacement solution is:

    #Window frames for previous input / Mask creation
    main_windows = np.lib.stride_tricks.as_strided(x_padded, self.window_shape, strides, writeable=False)
    mask = np.apply_along_axis(self.funct, -1, main_windows.reshape(*self.out_size, -1), backwards=True).reshape(main_windows.shape)
    
    #Use mask to distribute the gradient into the mask, reshaped into (channels, kernel height, kernel width, num of windows)
    pre_grad = np.einsum("ghw,ghwxy->ghwxy", dx, mask)
    
    # Zero array of original size (channels, in height, in width)
    final_grad = np.zeros_like(x_padded)
    final_windows = np.lib.stride_tricks.as_strided(final_grad, self.window_shape, strides, writeable=True)
    np.add.at(final_windows, (slice(None)), pre_grad)
    

    And this should properly add all of the windows back into the appropriate spots including overlap.