pythonnumpyoptimizationvectorization

Working with known indexes on a Numpy Array


I'm working on a Python project that generates random tile maps using NumPy. While my code is functional, it's currently too slow, and I'm struggling to vectorize it effectively to improve performance.

I start with a small grid (e.g., 4x4) and place a few land tiles. The map is then upscaled by subdividing each tile into 4 smaller tiles. For each of the 4 new tiles, the code decides whether to keep it as land or convert it to water. This decision is based on the surrounding tiles: more nearby water tiles increase the chance of converting to water, and more nearby land tiles increase the chance of remaining land. This process results in a fuzzy, nearest-neighbor type of upscaling.

This upscaling process is repeated, progressively increasing the grid size until reaching the desired map size (e.g., from 4x4 to 16x16 to 32x32 and so on). The final output is a landmass with borders that appear naturally textured due to the upscaling process.

During the upscaling process, water tiles cannot turn into land, so they are not checked. Since only land tiles can potentially turn into water, and a tile surrounded entirely by land cannot become water, the code doesn’t need to check each land tile. This reduces the number of tiles that need to be checked. The code tracks border tiles (those adjacent to water) and only update those via "check_tile" method.

However, I'm struggling to vectorize "check_tile" method, which consumes 90% of the execution time effectively with NumPy.

Here is the code (requires numpy version >= 2.0):

import numpy as np


class MapDataGen:
    """
    Procedurally generates a world map and returns a numpy array representation of it.

    Water proceeds from borders to inland.
    Water percentage increases with each iteration.
    """

    def __init__(self, start_size: int, seed: int) -> None:
        """
        Initialize starting world map.

        start_size: size of the map (start_size x start_size).
        seed: used for reproducible randomness.
        """
        # Starting map, filled with 0, start_size by start_size big.
        self.world_map = np.zeros((start_size, start_size), dtype=np.uint8)
        # Random number generator.
        self.rng = np.random.default_rng(seed)
        # List to store border tile indexes.
        self.borders = []
        self.newborders = []

    def add_land(self, land_id, from_index, to_index):
        """
        Add land to the world map at any time and at any map resolution.

        land_id: non-zero uint8, id of the new land tile (0 is reserved for water).
        from_index: starting index (inclusive) for the land area.
        to_index: ending index (exclusive) for the land area.
        """

        row_size, column_size = self.world_map.shape

        from_row = max(0, from_index[0])
        to_row = min(to_index[0], row_size)
        from_col = max(0, from_index[1])
        to_col = min(to_index[1], column_size)

        self.world_map[
            from_row:to_row,
            from_col:to_col,
        ] = land_id

        for row in range(from_row, to_row):
            for column in range(from_col, to_col):
                self.borders.append((row, column))

    def neighbours(self, index, radius) -> np.ndarray:
        """
        Returns neighbour tiles within the given radius of the index.

        index: tuple representing the index of the tile.
        radius: the radius to search for neighbours.
        """

        row_size, column_size = self.world_map.shape
        return self.world_map[
            max(0, index[0] - radius) : min(index[0] + radius + 1, row_size),
            max(0, index[1] - radius) : min(index[1] + radius + 1, column_size),
        ]

    def upscale_map(self) -> np.ndarray:
        """
        Divide each tile into 4 pieces and upscale the map by a factor of 2.

        """
        row, column = self.world_map.shape
        rs, cs = self.world_map.strides
        x = np.lib.stride_tricks.as_strided(
            self.world_map, (row, 2, column, 2), (rs, 0, cs, 0)
        )
        self.newmap = x.reshape(row * 2, column * 2)
        # \/Old version\/.
        # self.newmap = np.repeat(np.repeat(self.worldmap, 2, axis=0), 2, axis=1)

    def check_tile(self, index: tuple):
        """
        Texturize borders and update new borders.

        index: tuple representing the index of the tile to check.
        """
        # Corresponding land id to current working tile.
        tile = self.world_map[index]
        # If tile at the index is surrounded by identical tiles within a 2-tile range.
        if np.all(self.neighbours(index, 2) == tile):
            # Don't store it; this tile cannot become water because it is surrounded by 2-tile wide same land as itself.
            pass
        else:
            # The values of unique tiles and their counts.
            values, counts = np.unique_counts(self.neighbours(index, 1))
            # Calculate the probability of turning into other tiles for descended tiles.
            probability = counts.astype(np.float16)
            probability /= np.sum(probability)
            # Randomly change each of the 4 newly descended tiles of the original tile to either water or not.
            for row in range(2):
                for column in range(2):
                    # One of the four descended tile's index.
                    new_tile_index = (index[0] * 2 + row, index[1] * 2 + column)
                    # Randomly chose a value according to its probability.
                    random = self.rng.choice(values, p=probability)
                    if random == 0:  # If tile at the index became water.
                        # Update it on the new map.
                        self.newmap[new_tile_index] = random
                        # Don't store it because it is a water tile and no longer a border tile.
                    elif random == tile:  # If the tile remained the same.
                        # Store it because it is still a border tile.
                        self.newborders.append(new_tile_index)
                    else:  # If the tile changed to a different land.
                        # Update it on the new map.
                        self.newmap[new_tile_index] = random
                        # Store it because it is still a border tile.
                        self.newborders.append(new_tile_index)

    def default_procedure(self):
        """
        Default procedure: upscale (or zoom into) the map and texturize borders.

        """
        self.upscale_map()
        for index in self.borders:
            self.check_tile(index)
        self.borders = self.newborders
        self.newborders = []
        self.world_map = self.newmap

For to see what it does:

import time
from matplotlib import pyplot as plt
from matplotlib import colors

wmg = MapDataGen(13, 3)
wmg.add_land(1, (1, 1), (7, 7))
wmg.add_land(1, (8, 8), (12, 12))
plt.title(f"Starting Map")
colormap = colors.ListedColormap(
    [[21.0 / 255, 128.0 / 255, 209.0 / 255], [227.0 / 255, 217.0 / 255, 159.0 / 255]]
)
plt.imshow(wmg.world_map, interpolation="nearest", cmap=colormap)
plt.savefig(f"iteration {0}.png")
plt.show()
for i in range(13):
    start = time.time()
    wmg.default_procedure()
    end = time.time()
    plt.title(f"iteration {i+1} took {end-start} seconds")
    plt.imshow(wmg.world_map, interpolation="nearest", cmap=colormap)
    plt.savefig(f"{i}.png")
    plt.show()

Results:

What it does

I checked numpy game of life implementations but they all check all grids, but this code knows which tile indexes to check so they are not applicable.

How can I optimize this algorithm using NumPy, particularly with vectorization, to improve its performance? Are there any specific strategies or functions in NumPy that would help?

Any suggestions or examples would be greatly appreciated!


Solution

  • Note: I kept the explanation to a minimum, but this answer is still long. If you're only looking for the code, just scroll to the end.

    In my opinion, the key to vectorization is to extract the parts that can be vectorized.

    But before that, there is one part that needs to be corrected. The following code is doing something very redundant.

    values, counts = np.unique_counts(self.neighbours(index, 1))
    probability = counts.astype(np.float16)
    probability /= np.sum(probability)
    random = self.rng.choice(values, p=probability)
    

    It can be replaced with the following:

    random = self.rng.choice(self.neighbours(index, 1).ravel())
    

    For example, if there are 9 neighbours and 3 of them are water, the probability of selecting the water will be 3/9. Note, however, that the result will not be the same as the original, even though it does the same thing mathematically.

    Now, since the operation you wanted to perform was randomly select one of the neighbours, you can directly access the target tile like this:

    neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
    neighbour_index = self.rng.choice(neighbour_offset) + index
    random = self.world_map[tuple(neighbour_index)]
    

    You can see that rng.choice only needs to select one from the neighbour_offset, which does not change during the loop. Therefore, this can be calculated outside the loop. Below are the necessary changes:

        def check_tile(self, index: tuple, neighbour_indexes):
            """
            Texturize borders and update new borders.
    
            index: tuple representing the index of the tile to check.
            """
            # Corresponding land id to current working tile.
            tile = self.world_map[index]
            # If tile at the index is surrounded by identical tiles within a 2-tile range.
            if np.all(self.neighbours(index, 2) == tile):
                # Don't store it; this tile cannot become water because it is surrounded by 2-tile wide same land as itself.
                pass
            else:
                # Randomly change each of the 4 newly descended tiles of the original tile to either water or not.
                for row in range(2):
                    for column in range(2):
                        # One of the four descended tile's index.
                        new_tile_index = (index[0] * 2 + row, index[1] * 2 + column)
                        # Randomly chose a value according to its probability.
                        neighbour_index = neighbour_indexes[row * 2 + column]
                        random = self.world_map[tuple(neighbour_index)]
                        if random == 0:  # If tile at the index became water.
                            # Update it on the new map.
                            self.newmap[new_tile_index] = random
                            # Don't store it because it is a water tile and no longer a border tile.
                        elif random == tile:  # If the tile remained the same.
                            # Store it because it is still a border tile.
                            self.newborders.append(new_tile_index)
                        else:  # If the tile changed to a different land.
                            # Update it on the new map.
                            self.newmap[new_tile_index] = random
                            # Store it because it is still a border tile.
                            self.newborders.append(new_tile_index)
    
        def default_procedure(self):
            """
            Default procedure: upscale (or zoom into) the map and texturize borders.
    
            """
            self.upscale_map()
    
            # Choose 4 random neighbours for each border tile.
            # Note that below is NOT taking into account the case where the land is at the edge of the map.
            neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
            random_neighbour_offsets = self.rng.choice(neighbour_offset, size=(len(self.borders), 4))
            random_neighbour_indexes = np.asarray(self.borders)[:, None, :] + random_neighbour_offsets
    
            for index, neighbour_index in zip(self.borders, random_neighbour_indexes):
                self.check_tile(index, neighbour_index)
            self.borders = self.newborders
            self.newborders = []
            self.world_map = self.newmap
    

    (Note that in this code, the land tiles at the edge of the map are not taken into account. I have omitted this case here because it would prevent vectorization. If you need to deal with this, I recommend splitting the process into multiple steps and processing the edges separately.)

    With this, rng.choice is now vectorized. On my PC, this step alone made it 8 times faster than the original. You can continue this process repeatedly. I should probably explain everything, but it would be very long, so please let me skip to the end.

    This is the fully vectorized code:

        def default_procedure(self):
            """
            Default procedure: upscale (or zoom into) the map and texturize borders.
    
            """
            self.upscale_map()
    
            def take(arr, indexes):
                return arr[indexes[..., 0], indexes[..., 1]]
    
            def put(arr, indexes, values):
                arr[indexes[..., 0], indexes[..., 1]] = values
    
            land_tile_indexes = np.asanyarray(self.borders)
            land_tiles = take(self.world_map, land_tile_indexes)
    
            # Skip tiles that are surrounded by water.
            neighbour_25_windows = np.lib.stride_tricks.sliding_window_view(
                np.pad(self.world_map, pad_width=5 // 2, mode='edge'),  # Pad the map in order to skip edge cases.
                window_shape=(5, 5),
            )
            neighbour_25_windows = take(neighbour_25_windows, land_tile_indexes)  # Filter only the border tiles.
            is_not_surrounded_tile = np.any(neighbour_25_windows != land_tiles[..., None, None], axis=(-2, -1))
    
            # Filter land_tile_indexes by the condition.
            land_tile_indexes = land_tile_indexes[is_not_surrounded_tile]
    
            # Choose 4 random neighbours for each border tile.
            # Note that below is NOT taking into account the case where the land is at the edge of the map.
            neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
            random_neighbour_offsets = self.rng.choice(neighbour_offset, size=(len(land_tile_indexes), 4))
            random_neighbour_indexes = land_tile_indexes[:, None, :] + random_neighbour_offsets
            random_neighbour_tiles = take(self.world_map, random_neighbour_indexes)
    
            # Pre-calculate all the new tile indexes here.
            upscaling_offset = np.array([(dx, dy) for dx in range(2) for dy in range(2)])
            new_tile_indexes = (land_tile_indexes * 2)[:, None, :] + upscaling_offset[None]
    
            # Put the new tile if it did NOT remain the same.
            # In other words, this covers below cases:
            #  - If the tile became water.
            #  - If the tile changed to a different land.
            condition = random_neighbour_tiles != take(self.world_map, land_tile_indexes)[..., None]
            put(self.newmap, new_tile_indexes[condition], random_neighbour_tiles[condition])
    
            # Update the border tiles if it did NOT become water.
            # In other words, this covers below cases:
            #  - If the tile remained the same.
            #  - If the tile changed to a different land.
            target_indexes = new_tile_indexes[random_neighbour_tiles != 0]
    
            self.borders = target_indexes
            self.world_map = self.newmap
    

    At this point, the bottleneck is now self.upscale_map(). On my PC, this process took more than 80% of the runtime. The easiest way to solve this problem is to use OpenCV.

        def upscale_map(self) -> None:
            """
            Divide each tile into 4 pieces and upscale the map by a factor of 2.
    
            """
            self.newmap = cv2.resize(self.world_map, (0, 0), fx=2, fy=2, interpolation=cv2.INTER_NEAREST)
    

    Here is the result after 13 iterations.

    Result of iteration 13

    Finally, as an alternative, you could use Numba. With Numba, you might find it more intuitive to implement. However, combining Numba with classes can be a bit tricky, so you might need to rewrite a lot of your code anyway. Still, it’s worth looking into.