I have a function that ensures uniform sizes for grouped data by padding missing values with a fill_value. The function currently uses a for loop to populate the padded array.
Is there a better way to generate the padded array and get rid of the for loop using NumPy's builtin abilities (Edit: better in regards to performance and readability)?
Here is the function:
def ensure_uniform_groups(
groups: np.ndarray,
values: np.ndarray,
fill_value: np.number = np.nan) -> tuple[np.ndarray, np.ndarray]:
"""
Ensure uniform group lengths by padding each group to the same size.
Args:
groups : np.ndarray
1D array of group identifiers, assumed to be consecutive.
values : np.ndarray
1D/2D array of values corresponding to the group identifiers.
fill_value : np.number, optional
Value to use for padding groups. Default is np.nan.
Returns:
tuple[np.ndarray, np.ndarray]
A tuple containing uniform groups with padded values.
"""
# set common type
dtype = np.result_type(fill_value, values)
# derive group infos
n = groups.size
mask = np.r_[True, groups[:-1] != groups[1:]]
starts = np.arange(n)[mask]
ends = np.r_[starts[1:] - 1, n-1]
sizes = ends - starts + 1
max_size = np.max(sizes)
# check if data is uniform already
if np.all(sizes == max_size):
return groups, values
# generate uniform arrays
unique_groups = groups[starts]
full_groups = np.repeat(unique_groups, max_size)
full_values = np.full((full_groups.shape[0], values.shape[1]), fill_value=fill_value, dtype=dtype)
for i, (ia, ie) in enumerate(np.column_stack([starts, ends+1])):
ua = i * max_size
ue = ua + ie-ia
full_values[ua:ue] = values[ia:ie]
return full_groups, full_values
Here is an example:
groups = np.array([1, 1, 1, 2, 2, 3]) # size by group should be 3
values = np.column_stack([groups*10, groups*100])
fill_value = np.nan
ugroups, uvalues = ensure_uniform_groups(groups, values, fill_value)
out = np.vstack([ugroups, uvalues.T])
print(out)
# [[ 1. 1. 1. 2. 2. 2. 3. 3. 3.]
# [ 10. 10. 10. 20. 20. nan 30. nan nan]
# [100. 100. 100. 200. 200. nan 300. nan nan]]
Edit: Here is a benchmark which could be used to define "better" in regards to performance:
from timeit import timeit
runs = 10
groups = np.sort(np.random.randint(1, 100, 100_000))
values = np.random.rand(groups.size, 2)
baseline = timeit(lambda: ensure_uniform_groups(groups, values), number=runs)
time_better = timeit(lambda: ensure_uniform_groups_better(groups, values), number=runs)
print("Ratio compared to baseline (>1 is faster)")
print(f"ensure_uniform_groups_better: {baseline/time_better:.2f}")
Is there a better way to generate the padded array and get rid of the for loop using NumPy's builtin abilities?
Here is a way to do it without a loop:
import numpy as np
groups = np.array([1, 1, 1, 2, 2, 3]) # size by group should be 3
values = np.column_stack([groups*10, groups*100]).T
fill_value = np.nan
# Determine number of groups and maximum group size
res = np.unique_counts(groups)
n = len(res.values)
max_size = np.max(res.counts)
# Determine columns in which `values` go
i = max_size * np.arange(n)[:, np.newaxis]
j = i + res.counts[:, np.newaxis]
k = np.arange(n*max_size)
mask = np.any((k >= i) & (k < j), axis=0)
# Produce output array
out = np.full((1+values.shape[0], max_size*n), np.nan)
out[0] = np.repeat(res.values, [max_size]*n)
out[1:, mask] = values
out
# array([[ 1., 1., 1., 2., 2., 2., 3., 3., 3.],
# [ 10., 10., 10., 20., 20., nan, 30., nan, nan],
# [100., 100., 100., 200., 200., nan, 300., nan, nan]])
Better? Depends on your goals. If this does not meet your objectives, please consider describing them.