c++correlationsimdavxinner-product

Inner product of two unsigned byte vectors using AVX2 in C/C++


I'd like to implement a fast correlation coefficient computation using SSE/AVX2. The operands are two unsigned char vectors. The function should be mostly equivalent to this:

float correlate_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum1 = 0;
    int sum2 = 0;
    int sum11 = 0;
    int sum22 = 0;
    int sum12 = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2) {
        sum1 += *vec1;
        sum2 += *vec2;
        sum11 += *vec1  * *vec1;
        sum22 += *vec2  * *vec2;
        sum12 += *vec1  * *vec2;
    }
    double mean1 = double(sum1) / double(length);
    double mean2 = double(sum2) / double(length);
    double mean11 = double(sum11) / double(length);
    double mean22 = double(sum22) / double(length);
    double mean12 = double(sum12) / double(length);

    double b = (mean11 - mean1 * mean1) * (mean22 - mean2 * mean2);
    if (b <= 0.0)
        return 0.0f;
    double a = (mean12 - mean1 * mean2);

    return float(a / sqrt(b));
}

The parameter length ranges from 1 to less than 1000.

In order to do this, I researched on how to implement an inner product of two unsigned byte arrays. However, I could not come up with a solution that does not involve converting all the unsigned 8 bit values to signed 16 bit values.

The intrinsic _mm256_maddubs_epi16(a, b) expects b to be a signed byte. This would not be a problem in this case since subtracting some constant (here: 127) from b does not change the correlation coefficient. Unfortunately I could not find an intrinsic that would allow me to subtract 127 from unsigned bytes producing signed bytes (not relying on some two's complement magic).

// vec: const unsigned char*
auto x = _mm256_load_si256((const __m256i*) vec);
auto v = _mm256_set1_epi8(127);

// wrong if vec[i] is less than 127:
auto x_centered = _mm256_sub_epi8 (x, v);

What would be the best approach here to compute inner products (and finally a correlation coefficient)?

Addendum:

Below is my current implementation of a pure inner product. I decided to convert to 16 bit integer to avoid overflow errors. Update: Changed from reading 128 bits to 256 bits at a time.

int accumulate_i32(__m256i x)
{
    auto tmp1 = _mm256_srli_si256(x, 8);
    x = _mm256_add_epi32(x, tmp1);
    auto tmp2 = _mm256_extractf128_si256(x, 1);
    tmp2 = _mm_add_epi32(tmp2, _mm256_castsi256_si128(x));
    
    return _mm_cvtsi128_si32(tmp2) + _mm_extract_epi32(tmp2, 1);
}

int inner_product_avx(const unsigned char* vec1, const unsigned char* vec2, unsigned int length)
{    
    constexpr unsigned int memoryAlignmentBytes = 32;
    constexpr unsigned int bytesPerPack = 256 / 8;

    assert((reinterpret_cast<std::uintptr_t>(vec1) % memoryAlignmentBytes) == 0);
    assert((reinterpret_cast<std::uintptr_t>(vec2) % memoryAlignmentBytes) == 0);    
    

    // compute middle part via AVX2    
    unsigned int packCount = length / bytesPerPack;
    const __m256i zeros = _mm256_setzero_si256();
    auto sumlo = _mm256_setzero_si256();
    auto sumhi = _mm256_setzero_si256();

    for (unsigned int packIdx = 0; packIdx < packCount; ++packIdx) {
        auto x1 = _mm256_load_si256((const __m256i*)vec1);
        auto x2 = _mm256_load_si256((const __m256i*)vec2);

        auto x1lo = _mm256_unpacklo_epi8(x1, zeros);
        auto x1hi = _mm256_unpackhi_epi8(x1, zeros);
        auto x2lo = _mm256_unpacklo_epi8(x2, zeros);
        auto x2hi = _mm256_unpackhi_epi8(x2, zeros);
        
        auto tmplo = _mm256_madd_epi16(x1lo, x2lo);
        auto tmphi = _mm256_madd_epi16(x1hi, x2hi);

        sumlo = _mm256_add_epi32(sumlo, tmplo);
        sumhi = _mm256_add_epi32(sumhi, tmphi);

        vec1 += bytesPerPack;
        vec2 += bytesPerPack;
    }

    int sum = accumulate_i32(sumlo) + accumulate_i32(sumhi);

    
    // compute remaining part that cannot be represented as a 
    // whole packed integer    
    unsigned int packRestCount = length % bytesPerPack;
    for (size_t i = packRestCount; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    
    return sum;
}

This takes roughly 20 % of the time of the simple C++ implementation (see below). Considering the fact that the AVX code works on 16 16-bit integers simultaneously, I would have expected a higher gain. - Is this reasonable or did I miss something?

Unrolling the last loop in the AVX code did not descrease computation time.

int inner_product_simple(const unsigned char* vec1, const unsigned char* vec2, size_t length)
{
    int sum = 0;

    for (size_t i = length; i > 0; --i, ++vec1, ++vec2)
        sum += int(*vec1) * int(*vec2);

    return sum;
}

Solution

  • I would start from something like that. It uses 32-bit accumulators just like your current code. Untested.

    namespace
    {
        // Compute sum of the 64-bit lanes, convert to double
        inline double hadd_epi64( const __m256i i64 )
        {
            __m128i res = _mm256_castsi256_si128( i64 );
            res = _mm_add_epi64( res, _mm256_extractf128_si256( i64, 1 ) );
            res = _mm_add_epi64( res, _mm_unpackhi_epi64( res, res ) );
            return (double)_mm_cvtsi128_si64( res );
        }
    
        // Convert 32-bit lanes into 64-bit, compute sum of the 8, convert to double
        inline double hadd_epu32( __m256i x )
        {
            const __m256i zero = _mm256_setzero_si256();
            __m256i i64 = _mm256_unpacklo_epi32( x, zero );
            i64 = _mm256_add_epi64( i64, _mm256_unpackhi_epi32( x, zero ) );
            return hadd_epi64( i64 );
        };
    }
    
    class InnerProduct
    {
        // These fields are interpreted as 64-bit integers
        __m256i a, b;
        // These fields are interpreted as 32-bit integers
        __m256i aa, bb, ab;
    
        // Accumulate products of 16-bit numbers, 16 of them at once
        inline void add16( __m256i x, __m256i y )
        {
            const __m256i x2 = _mm256_madd_epi16( x, x );
            const __m256i y2 = _mm256_madd_epi16( y, y );
            const __m256i prod = _mm256_madd_epi16( x, y );
    
            aa = _mm256_add_epi32( aa, x2 );
            bb = _mm256_add_epi32( bb, y2 );
            ab = _mm256_add_epi32( ab, prod );
        }
    
    public:
    
        InnerProduct()
        {
            a = b = aa = bb = ab = _mm256_setzero_si256();
        }
    
        // Handle 32 bytes
        inline void addBytes( __m256i x, __m256i y )
        {
            // Accumulate values
            const __m256i zero = _mm256_setzero_si256();
            a = _mm256_add_epi64( a, _mm256_sad_epu8( x, zero ) );
            b = _mm256_add_epi64( b, _mm256_sad_epu8( y, zero ) );
    
            // Split the vectors into 2 sets of 16-bit numbers, accumulate products
            const __m256i z = _mm256_unpacklo_epi8( x, zero );
            const __m256i w = _mm256_unpacklo_epi8( y, zero );
            add16( z, w );
    
            x = _mm256_unpackhi_epi8( x, zero );
            y = _mm256_unpackhi_epi8( y, zero );
            add16( x, y );
        }
    
        // Compute the result
        float compute( size_t count ) const
        {
            const double div = (double)count;
            const double mean1 = hadd_epi64( a ) / div;
            const double mean2 = hadd_epi64( b ) / div;
            const double mean11 = hadd_epu32( aa ) / div;
            const double mean22 = hadd_epu32( bb ) / div;
            const double mean12 = hadd_epu32( ab ) / div;
    
            const double b = ( mean11 - mean1 * mean1 ) * ( mean22 - mean2 * mean2 );
            if( b <= 0 )
                return 0;
            const double a = ( mean12 - mean1 * mean2 );
            return float( a / sqrt( b ) );
        }
    };
    
    // Load 1-31 bytes into AVX register, zero out unused higher bytes
    inline __m256i loadPartial( const uint8_t* p, size_t length )
    {
        alignas( 32 ) std::array<uint8_t, 32> arr{};
        memcpy( arr.data(), p, length );
        return _mm256_load_si256( ( const __m256i* )arr.data() );
    }
    
    float correlate_simple( const uint8_t* vec1, const uint8_t* vec2, const size_t length )
    {
        InnerProduct ip;
        const uint8_t* const vec1End = vec1 + ( ( length / 32 ) * 32 );
        for( ; vec1 < vec1End; vec1 += 32, vec2 += 32 )
        {
            const __m256i x = _mm256_loadu_si256( ( const __m256i * )vec1 );
            const __m256i y = _mm256_loadu_si256( ( const __m256i * )vec2 );
            ip.addBytes( x, y );
        }
        const size_t remainder = length % 32;
        if( remainder > 0 )
        {
            const __m256i x = loadPartial( vec1, remainder );
            const __m256i y = loadPartial( vec2, remainder );
            ip.addBytes( x, y );
        }
        return ip.compute( length );
    }