Given a positive integer N, we can label all grid points in the square N x N, starting at 1, the total number of grid points is N x N, and the grid points are list(itertools.product(range(1, N + 1), repeat=2))
.
Now, I want to find all tuples (x, y)
that satisfies the condition x/y is a non-reduced fraction, the following is a bruteforce implementation that is guaranteed to be correct, but it is very inefficient:
import math
from itertools import product
def find_complex_points(lim: int) -> list[tuple[int, int]]:
return [
(x, y)
for x, y in product(range(1, lim + 1), repeat=2)
if math.gcd(x, y) > 1
]
Now the next function is slightly smarter, but it generates duplicates and as a result is only noticeably faster but not by much:
def find_complex_points_1(lim: int) -> set[tuple[int, int]]:
lim += 1
return {
(x, y)
for mult in range(2, lim)
for x, y in product(range(mult, lim, mult), repeat=2)
}
In [255]: %timeit find_complex_points(1024)
233 ms ± 4.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [256]: %timeit find_complex_points_1(1024)
194 ms ± 1.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Is there a better way to accomplish this?
(My goal is simple, I want to create a NumPy 2D array of uint8 type with shape (N, N), fill it with 255, and make all pixels (x, y) 0 if (x+1)/(y+1) is a non-reduced fraction)
I have devised a method that is smarter than both my previous ones by a wide margin, and also tremendously faster, but it still generates duplicates, I have opt to not to use a set
here so that you can copy-paste the code as is and run some tests and see the exact output in the order they are generated:
def find_complex_points_2(lim: int) -> set[tuple[int, int]]:
stack = dict.fromkeys(range(lim, 1, -1))
lim += 1
points = []
while stack:
x, _ = stack.popitem()
points.append((x, x))
mults = []
for y in range(x * 2, lim, x):
stack.pop(y, None)
mults.append(y)
points.extend([(x, y), (y, x)])
for i, x in enumerate(mults):
points.append((x, x))
for y in mults[i + 1:]:
points.extend([(x, y), (y, x)])
return points
In [292]: sorted(set(find_complex_points_2(1024))) == find_complex_points(1024)
Out[292]: True
In [293]: %timeit find_complex_points_2(1024)
58.9 ms ± 580 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [294]: %timeit find_complex_points(1024)
226 ms ± 3.24 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
To clarify, the output of find_complex_points_2(10)
is:
In [287]: find_complex_points_2(10)
Out[287]:
[(2, 2),
(2, 4),
(4, 2),
(2, 6),
(6, 2),
(2, 8),
(8, 2),
(2, 10),
(10, 2),
(4, 4),
(4, 6),
(6, 4),
(4, 8),
(8, 4),
(4, 10),
(10, 4),
(6, 6),
(6, 8),
(8, 6),
(6, 10),
(10, 6),
(8, 8),
(8, 10),
(10, 8),
(10, 10),
(3, 3),
(3, 6),
(6, 3),
(3, 9),
(9, 3),
(6, 6),
(6, 9),
(9, 6),
(9, 9),
(5, 5),
(5, 10),
(10, 5),
(10, 10),
(7, 7)]
As you can see, (10, 10)
shows up twice. I want to avoid redundant computations.
Also this happens in find_complex_points_1
, if I don't use a set, then many duplicates will be included, because the method used will inevitably generate them repeatedly, by using a set there still is unnecessary computation, it just doesn't collect the duplicates.
And no, I actually want the coordinates to be replaced by the sum of all numbers before it, so N is replaced by (N2 + N) / 2.
I just implemented the image generation to better illustrate what I want:
import numpy as np
import numba as nb
@nb.njit(cache=True)
def resize_img(img: np.ndarray, h_scale: int, w_scale: int) -> np.ndarray:
height, width = img.shape
result = np.empty((height, h_scale, width, w_scale), np.uint8)
result[...] = img[:, None, :, None]
return result.reshape((height * h_scale, width * w_scale))
def find_composite_points(lim: int) -> set[tuple[int, int]]:
stack = dict.fromkeys(range(lim, 1, -1))
lim += 1
points = set()
while stack:
x, _ = stack.popitem()
points.add((x, x))
mults = []
for y in range(x * 2, lim, x):
stack.pop(y, None)
mults.append(y)
points.update([(x, y), (y, x)])
for i, x in enumerate(mults):
points.add((x, x))
for y in mults[i + 1 :]:
points.update([(x, y), (y, x)])
return points
def natural_sum(n: int) -> int:
return (n + 1) * n // 2
def composite_image(lim: int, scale: int) -> np.ndarray:
length = natural_sum(lim)
img = np.full((length, length), 255, dtype=np.uint8)
for x, y in find_composite_points(lim):
x1, y1 = natural_sum(x - 1), natural_sum(y - 1)
img[x1 : x1 + x, y1 : y1 + y] = 0
return resize_img(img, scale, scale)
composite_image(12, 12)
The algorithm of Weeble runs in O(n² m²)
where m
is the size of integers in bits (using a naive multiplication). Since we can assume the multiplication of numbers to be done in constant time (due to bounded native integers used by Numpy), this means O(n²)
but with a significant hidden constant which should not be neglected. Performance wise, the algorithm is bounded by inefficient page fault operations and the filling of big temporary arrays. It is far from being optimal.
The algorithm of Pete Kirkham should run in O(n²)
(hard to prove formally though) with a relatively small hidden constant. This is is a good approach. However, is is very slow because of inefficient scalar Numpy operations instead of vectorised ones. Fortunately, it can be easily vectorised:
array = np.full((N,N), 255, np.uint8)
for d in range(2, N+1):
array[d-1:N:d, d-1:N:d] = 0
Note I corrected the implementation to return correct results (with values 0 and 255).
A very simple alternative solution is just to use Numba so to speed up the code of Pete Kirkham. That being said, the code is not efficient because it iterates on items of different rows in the inner most loop. We can easily fix that by swapping variables:
import numba as nb
@nb.njit('(int32,)')
def compute(N):
array = np.full((N,N), 255, np.uint8)
for denominator in range(2, N+1):
for i in range(denominator, N+1, denominator):
for j in range(denominator, N+1, denominator):
array[i-1, j-1] = 0
return array
Note that the output matrix is symmetric so we don't even need to compute the bottom-left part. Indeed, gcd(a, b) == gcd(b, a)
. Unfortunately, I do not think the can use this property to make the Numpy vectorised code but we can probably make the Numba code faster.
Moreover, the diagonal can be trivially set to 0 (except the first item) since gcd(a, a) == a
so gcd(a, a) > 1
if a > 1
. Technically, we can also trivially set the direct neighbourhood of the diagonal (i.e. array.diagonal(1)
) to 255 since gcd(a, a-1) = 1
. array.diagonal(1)
should be filled with alternating values (i.e. [255, 0, 255, 0, ...]
) since gcd(a, a-2) = gcd(a, 2) = 2 - (a % 2)
. A similar strategy can be applied for array.diagonal(2)
. For other diagonal, it starts to be more complex since we certainly need to factorise numbers. Factorising numbers is known to be expensive, but this cost is amortised here since we need to do that O(n)
times.
Another symmetry of the gcd is gcd(a, b) = gcd(a, b-a) = gcd(a-b, b)
.
We can leverage all these symmetry of the gcd so to write a significantly faster implementation using dynamic programming. A naive implementation (combining all the symmetries rather efficiently) is the following:
@nb.njit('(int32,)')
def compute_v2(n):
arr = np.empty((n,n), np.uint8)
arr[:, 0] = 255
arr[0, :] = 255
for i in range(1, n):
for j in range(1, i):
arr[i, j] = arr[j, i-j-1] # <--- very slow part
arr[i, i] = 0
for j in range(i+1, n):
arr[i, j] = arr[i, j-i-1]
return arr
Unfortunately the transposition is pretty inefficient and take nearly all the time... Optimizing is possible but not easy. We can divide the computation in dependent tiles (similar to how block LU decomposition algorithm work). This makes the code more complex much much faster thanks to a more efficient access pattern:
# Compute the tile arr[start:stop,start:stop].
# Assume arr[:start,:start] has been already computed.
# Assume start and stop are valid.
@nb.njit('(uint8[:,::1], uint32, uint32)', inline='always')
def compute_diag_tile(arr, start, stop):
for i in range(start, stop):
for j in range(start, i):
arr[i, j] = arr[j, i-j-1]
arr[i, i] = 0
for j in range(i+1, stop):
arr[i, j] = arr[i, j-i-1]
# Compute the tile arr[start:stop,stop:].
# Assume arr[start:stop,:stop] has been already computed.
# Assume start and stop are valid.
@nb.njit('(uint8[:,::1], uint32, uint32)', inline='always')
def compute_upper_right_tile(arr, start, stop):
n = np.uint32(arr.shape[1])
for i in range(start, stop):
for j in range(stop, n):
arr[i, j] = arr[i, np.uint64(j-i-1)]
# Compute the tile arr[stop:,start:stop].
# Assume arr[start:stop,stop:] has been already computed; that is to say
# compute_upper_right_tile has been called on the associated diag tile.
# This function transposes the tile written by compute_upper_right_tile.
# Assume start and stop are valid.
@nb.njit('(uint8[:,::1], uint32, uint32)', inline='always')
def compute_bottom_left_tile(arr, start, stop):
n = np.uint32(arr.shape[0])
for i in range(stop, n):
for j in range(start, stop):
arr[i, j] = arr[j, i]
@nb.njit('(uint8[:,::1], uint32, uint32)', inline='always')
def compute_tile_group(arr, start, stop):
compute_diag_tile(arr, start, stop)
compute_upper_right_tile(arr, start, stop)
compute_bottom_left_tile(arr, start, stop)
@nb.njit('(uint32,)')
def compute_v3(n):
chunk_size = 32
arr = np.empty((n, n), np.uint8)
arr[0, :] = 255
arr[:, 0] = 255
for start in range(1, n, chunk_size):
if start + chunk_size <= n:
compute_tile_group(arr, start, start + chunk_size)
else:
compute_tile_group(arr, start, n)
return arr
The transposition is still the slowest part of this code. It can be optimized further but at the expense of a significantly bigger code. I prefer to keep this reasonably simple, but note that one way to make the transposition much faster is to use SIMD intrinsics (certainly at least >2 faster).
Here are results for N=1024
on my machine (i5-9600KF CPU):
find_complex_points: 173 ms
PeteKirkham's code: 108 ms
find_complex_points_1: 99 ms
Weeble's code: 70 ms
find_complex_points_2: 38 ms
Vectorized Numpy code: 4.0 ms <-----
PeteKirkham's code with Numba: 2.5 ms
Numba code `compute_v2`: 0.70 ms
Numba code `compute`: 0.66 ms
Numba code `compute_v3`: 0.45 ms <-----
find_complex_points_3: 0.44 ms
The vectorised Numba code is much faster than the other implementation and the optimised Numba codes outperforms all implementation by a large margin (except the new find_complex_points_3
)!
One can parallelise some of the Numba codes to make it even faster but this is not trivial and it is certainly fast enough anyway, not to mention is will not scale well because the code is rather memory-bound for large N
. Actually, a basic Numpy copy takes about 0.3 ms, which can be considered as a lower bound execution time.