pythonpandasnumpyperformance

How to improve pandas-numpy performance in decoding big sequences of bytes


I have to decode and put into pandas dataframes, or at least numpy arrays, dozens of binary files each containing hundreds of "signals" made by ~10^5 samples of 17 bytes each (time=uint64, value=float64, flags=uint8). The time field is especially challenging because is represented as number of 100ns units from 0001/01/01 00:00:00 so I have to convert it to time_t64 before putting it into a np array.

I come up with "current_implementation" in the code below, but it takes 6.7 sec per 100 "signals" on my machine and the performance is unsatisfactory. I'm trying speed-up the code with some alternative implementations but the best case is 5.5 seconds, still too much.

Any suggestion?

import pandas as pd
import numpy as np
import timeit
from struct import unpack, iter_unpack

# DELTA_EPOCH is the delay in 100 ns units from 0001/01/01 to 1970/01/01.
DELTA_EPOCH = 621355968000000000

# MAGIC_NUMBER is the factor to convert from 100 ns units to seconds.
MAGIC_NUMBER = 1/10000000


def current_implementation():

    # Decode values in a single pass
    fmt = "<" + "QdB" * nsamples
    nums = unpack(fmt, signal)

    # I put the numbers in the arrays so then I can use the np's vectorized operations
    ts = np.array(nums[::3], dtype=np.float64)
    vl = np.array(nums[1::3], dtype=np.float64)
    fl = np.array(nums[2::3], dtype=np.uint8)

    # Transform timestamp from FILEDATE to UNIX64 and finally to datetime64[ns] format
    ts = (ts - DELTA_EPOCH) * MAGIC_NUMBER
    idx = pd.to_datetime(ts, unit='s')
    return idx, vl, fl


def alternative_implementation1():

    # Using iter_unpack I get a list of tuples suitable to be put into a structured array
    fmt = "<QdB"
    tmp = list(iter_unpack(fmt, signal))
    tmp = np.array(tmp, dtype=[('idx', np.uint64), ('vl', np.float64), ('fl', np.uint8)])
    tmp['idx'] = np.array((tmp['idx'] - DELTA_EPOCH) * 100, dtype = 'datetime64[ns]')
    return tmp


def alternative_implementation2():
    fmt = "<" + "QdB" * nsamples
    nums = unpack(fmt, signal)
    tmp = np.array(nums[::3], dtype=np.uint64) - DELTA_EPOCH
    ts = np.array(tmp * 100, dtype='datetime64[ns]')   # 100 to convert from 100ns units to ns
    vl = np.array(nums[1::3], dtype=np.float64)
    fl = np.array(nums[2::3], dtype=np.uint8)
    idx = pd.DatetimeIndex(ts)
    return idx, vl, fl


if __name__ == "__main__":

    # Prepare test signal
    nsamples = 300000
    signal = b'\xcd\xf1\xb9!\x18\xbb\xda\x08\x00\x00\x00\x80\x01\xc84@\x03' * nsamples

    print("\nCurrent implementation ", end="")
    print(timeit.timeit('current_implementation()', number=100, globals=globals()))

    print("\nalternative_implementation1 ", end="")
    print(timeit.timeit('alternative_implementation1()', number=100, globals=globals()))

    print("\nalternative_implementation2 ", end="")
    print(timeit.timeit('alternative_implementation2()', number=100, globals=globals()))

Solution

  • No need to do anything fancy. Use np.frombuffer in alternative_implementation1.

    def alternative_implementation1b():
        tmp = np.frombuffer(signal, dtype=[("idx", np.uint64), ("vl", np.float64), ("fl", np.uint8)])
        ts = np.array((tmp["idx"] - DELTA_EPOCH) * 100, dtype="datetime64[ns]")
        vl = tmp["vl"]
        fl = tmp["fl"]
        return ts, vl, fl
    

    It may be necessary to note that vl and fl are not contiguous in memory and are read-only in this implementation. If that is a problem, you can simply copy them. It will still be fast enough.

    def alternative_implementation1b_with_copy():
        tmp = np.frombuffer(signal, dtype=[("idx", np.uint64), ("vl", np.float64), ("fl", np.uint8)])
        ts = np.array((tmp["idx"] - DELTA_EPOCH) * 100, dtype="datetime64[ns]")
        vl = np.ascontiguousarray(tmp["vl"])
        fl = np.ascontiguousarray(tmp["fl"])
        return ts, vl, fl
    

    Benchmark:

    if __name__ == "__main__":
        # Prepare test signal
        nsamples = 300000
        signal = b"\xcd\xf1\xb9!\x18\xbb\xda\x08\x00\x00\x00\x80\x01\xc84@\x03" * nsamples
    
        candidates = [
            current_implementation,
            alternative_implementation1,
            alternative_implementation2,
            current_implementation_2,
            alternative_implementation1b,
            alternative_implementation1b_with_copy,
        ]
        name_len = max(len(f.__name__) for f in candidates)
    
        for f in candidates:
            t = timeit.repeat(f, number=100, repeat=3)
            print(f"{f.__name__:{name_len}}: {min(t)}")
    

    Result:

    current_implementation                : 9.686750392982503
    alternative_implementation1           : 15.24386641397723
    alternative_implementation2           : 8.135940829990432
    current_implementation_2              : 0.8789778979844414
    alternative_implementation1b          : 0.05908472600276582
    alternative_implementation1b_with_copy: 0.0940033549850341