pythonnumpybyte

Python - Add a byte after every n-th byte in a bytearray


I want to add a byte after every 3rd byte in a bytearray.

# This is an example of where the bytes would be added
b'\xF2\xA1\x23{ADD}\xFF\x00\x05{ADD}\xE2\x20\x05{ADD}...'

A specific problem

Usually I have a bytearray with chunks of 4 bytes each (so when I want to split it into multiple uint32 later). Sometimes the bytearray is a chunk of 3 bytes each, which cannot be passed as easily into functions. I want to add an "empty" byte after every 3rd byte in the bytearray (efficiently, as there are around 1.000.000 bytes)

The short term problem is currently an issue with np.frombytes():

data #(bytearray, data is either chunks of 3 bytes each or 4 bytes each)
if buffer_size == 3: # already known
 pass # here it should extend the smaller bytearray with one empty byte
new_array = np.frombytes(data, dtype=np.uint32) # work normal with 4 bytes, fails with 3 bytes
val1, val2 = new_array//256, new_array%256

A solution

For np.frombuffer, I have adjusted the Datatype to allow the file to be read (and automatically reshaped.

val_list = np.frombuffer(data, dtype=(np.uint8, buffer_size))
# val_list = val_list[:, 0]*(256*256) + val_list[:, 1]*256 + val_list[:,2]

Old alternative

I avoid the problem altogether by splitting the bytearray into strings, but it runs rather slowly:

new_array = np.array(data.hex(" ", buffer_size).upper().split(" "))
new_array = np.vectorize(int)(new_array, 16)

Solution

  • If the input is large, it would be efficient to perform all operations on numpy. Like this:

    def pad_to_4bytes(data: bytes | bytearray, buffer_size: int = 3) -> np.ndarray:
        assert 1 <= buffer_size < 4
        assert isinstance(data, (bytes, bytearray))
        assert len(data) % buffer_size == 0
        n_chunks = len(data) // buffer_size
    
        # Convert the data into a 2D numpy array, with each row representing a chunk.
        chunks = np.frombuffer(data, dtype=np.uint8).reshape(n_chunks, buffer_size)
    
        # The following is an efficient way to pad every 3rd element in numpy.
        out = np.zeros((n_chunks, 4), dtype=np.uint8)
        assert np.little_endian  # Assuming little-endian, based on your example code.
        out[:, :buffer_size] = chunks[:, ::-1]  # Reverse is for endianness.
    
        # `numpy.view` only affects how an array is interpreted, without copying it.
        return out.ravel().view(np.uint32)
    

    Here is a test and benchmark.

    import timeit
    
    import numpy as np
    
    
    def baseline(data: bytes | bytearray) -> np.ndarray:
        assert isinstance(data, (bytes, bytearray))
        assert len(data) % 3 == 0
    
        buffer_size = 3
        new_array = np.array(data.hex(" ", buffer_size).upper().split(" "))
        return np.vectorize(int)(new_array, 16).astype(np.uint32)
    
    
    def pad_to_4bytes(data: bytes | bytearray, buffer_size: int = 3) -> np.ndarray:
        assert 1 <= buffer_size < 4
        assert isinstance(data, (bytes, bytearray))
        assert len(data) % buffer_size == 0
        n_chunks = len(data) // buffer_size
    
        # Convert the data into a 2D numpy array, with each row representing a chunk.
        chunks = np.frombuffer(data, dtype=np.uint8).reshape(n_chunks, buffer_size)
    
        # The following is an efficient way to pad every 3rd element in numpy.
        out = np.zeros((n_chunks, 4), dtype=np.uint8)
        assert np.little_endian  # Assuming little-endian, based on your example code.
        out[:, :buffer_size] = chunks[:, ::-1]  # Reverse is for endianness.
    
        # `numpy.view` only affects how an array is interpreted, without copying it.
        return out.ravel().view(np.uint32)
    
    
    def benchmark() -> None:
        rng = np.random.default_rng(0)
        arr = bytearray(rng.integers(0, 256, size=3 * 400_000, dtype=np.uint8).tobytes())
        expected = baseline(arr)
    
        candidates = [
            baseline,
            pad_to_4bytes,
        ]
    
        for func in candidates:
            assert np.array_equal(func(arr), expected)
            times = timeit.repeat(lambda: func(arr), number=1, repeat=10)
            print(f"{func.__name__}: {min(times) * 1000:.3f} ms")
    
    
    benchmark()
    

    Result:

    baseline: 143.055 ms
    pad_to_4bytes: 1.499 ms