I want to add a byte after every 3rd byte in a bytearray.
# This is an example of where the bytes would be added
b'\xF2\xA1\x23{ADD}\xFF\x00\x05{ADD}\xE2\x20\x05{ADD}...'
Usually I have a bytearray with chunks of 4 bytes each (so when I want to split it into multiple uint32 later). Sometimes the bytearray is a chunk of 3 bytes each, which cannot be passed as easily into functions. I want to add an "empty" byte after every 3rd byte in the bytearray (efficiently, as there are around 1.000.000 bytes)
The short term problem is currently an issue with np.frombytes()
:
data #(bytearray, data is either chunks of 3 bytes each or 4 bytes each)
if buffer_size == 3: # already known
pass # here it should extend the smaller bytearray with one empty byte
new_array = np.frombytes(data, dtype=np.uint32) # work normal with 4 bytes, fails with 3 bytes
val1, val2 = new_array//256, new_array%256
For np.frombuffer
, I have adjusted the Datatype to allow the file to be read (and automatically reshaped.
val_list = np.frombuffer(data, dtype=(np.uint8, buffer_size))
# val_list = val_list[:, 0]*(256*256) + val_list[:, 1]*256 + val_list[:,2]
I avoid the problem altogether by splitting the bytearray into strings, but it runs rather slowly:
new_array = np.array(data.hex(" ", buffer_size).upper().split(" "))
new_array = np.vectorize(int)(new_array, 16)
If the input is large, it would be efficient to perform all operations on numpy. Like this:
def pad_to_4bytes(data: bytes | bytearray, buffer_size: int = 3) -> np.ndarray:
assert 1 <= buffer_size < 4
assert isinstance(data, (bytes, bytearray))
assert len(data) % buffer_size == 0
n_chunks = len(data) // buffer_size
# Convert the data into a 2D numpy array, with each row representing a chunk.
chunks = np.frombuffer(data, dtype=np.uint8).reshape(n_chunks, buffer_size)
# The following is an efficient way to pad every 3rd element in numpy.
out = np.zeros((n_chunks, 4), dtype=np.uint8)
assert np.little_endian # Assuming little-endian, based on your example code.
out[:, :buffer_size] = chunks[:, ::-1] # Reverse is for endianness.
# `numpy.view` only affects how an array is interpreted, without copying it.
return out.ravel().view(np.uint32)
Here is a test and benchmark.
import timeit
import numpy as np
def baseline(data: bytes | bytearray) -> np.ndarray:
assert isinstance(data, (bytes, bytearray))
assert len(data) % 3 == 0
buffer_size = 3
new_array = np.array(data.hex(" ", buffer_size).upper().split(" "))
return np.vectorize(int)(new_array, 16).astype(np.uint32)
def pad_to_4bytes(data: bytes | bytearray, buffer_size: int = 3) -> np.ndarray:
assert 1 <= buffer_size < 4
assert isinstance(data, (bytes, bytearray))
assert len(data) % buffer_size == 0
n_chunks = len(data) // buffer_size
# Convert the data into a 2D numpy array, with each row representing a chunk.
chunks = np.frombuffer(data, dtype=np.uint8).reshape(n_chunks, buffer_size)
# The following is an efficient way to pad every 3rd element in numpy.
out = np.zeros((n_chunks, 4), dtype=np.uint8)
assert np.little_endian # Assuming little-endian, based on your example code.
out[:, :buffer_size] = chunks[:, ::-1] # Reverse is for endianness.
# `numpy.view` only affects how an array is interpreted, without copying it.
return out.ravel().view(np.uint32)
def benchmark() -> None:
rng = np.random.default_rng(0)
arr = bytearray(rng.integers(0, 256, size=3 * 400_000, dtype=np.uint8).tobytes())
expected = baseline(arr)
candidates = [
baseline,
pad_to_4bytes,
]
for func in candidates:
assert np.array_equal(func(arr), expected)
times = timeit.repeat(lambda: func(arr), number=1, repeat=10)
print(f"{func.__name__}: {min(times) * 1000:.3f} ms")
benchmark()
Result:
baseline: 143.055 ms
pad_to_4bytes: 1.499 ms