c++parallel-processingeigen

Parallelize filling of Eigen Matrix in C++


Let's say that i am trying to fill a MatrixXf with the rolling mean of another MatrixXf like this

MatrixXf ComputeRollingMean(const MatrixXf& features) {
    MatrixXf df(features.rows() - 30, features.cols());

    for (size_t i = 30; i < features.rows(); ++i) {
            MatrixXf mat = features.block(i-30, 0, 30, 3);
            RowVectorXf mean_vector = mat.colwise().mean();
            RowVectorXf stdev_vector = ((mat.rowwise() - mean_vector).colwise().squaredNorm() / (mat.rows()-1)).cwiseSqrt();
            MatrixXf zscores = (mat.rowwise() - mean_vector).array().rowwise() / stdev_vector.array();
            RowVectorXf zscore = zscores(last, all);
            df.block(index, 0, 1, 3) = zscore;
    }
    return df;
}

It seems that Eigen has some sort of parallelism process

#include <Eigen/Core>

and then

int main() {
    Eigen::initParallel();
    ...}

But I am not sure how this can parallelize the for loop in my ComputeRollingMean function. Anyone knows how I could parallelize, and maybe choose the number of process i want to start please?


Solution

  • What you need is not parallelization but simple optimization. As a rule of thumb, you should never parallelize before you have exhausted all reasonable improvements to the serial code. Exceptions may apply to massively parallel architectures like GPUs but even they usually profit from fast serial computational kernels.

    Basic code cleanup

    There are a couple of issues with your code:

    Minor issues are:

    An improved version may look like this:

    Eigen::MatrixXf ComputeZscore(const Eigen::MatrixXf& features) {
        constexpr int windowsize = 30;
        Eigen::MatrixXf df(features.rows() - windowsize, features.cols());
        const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
        Eigen::RowVectorXf mean_vector, stdev_vector;
        for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
            const auto& block = features.middleRows(i - windowsize, windowsize);
            mean_vector = block.colwise().mean();
            stdev_vector = (block.rowwise() - mean_vector)
                    .colwise().norm() * stdfactor;
            df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
                    .array() / stdev_vector.array();
        }
        return df;
    }
    

    Note Eigen's warning about using auto. The use here is okay.

    This code already runs about twice as fast in my tests.

    Fixed column number

    It looks like your matrix has only three columns. Therefore the row vectors are also tiny. If the size is known at compile-time, we can save dynamic allocations and allow the compiler to optimize the code. We can make this decision via macros so that it can deactivated.

    #define OPTIMIZE_FIXED_COLS 1
    
    #if OPTIMIZE_FIXED_COLS
    constexpr Eigen::Index FeatureCols = 3;
    #else
    constexpr Eigen::Index FeatureCols = Eigen::Dynamic;
    #endif
    
    // Same as MatrixXf or MatrixX3f
    using MatrixType = Eigen::Matrix<float, Eigen::Dynamic, FeatureCols>;
    // Same as RowVectorXf or RowVectorX3f
    using RowVectorType = Eigen::Matrix<float, 1, FeatureCols>;
    
    MatrixType ComputeZscore(const MatrixType& features) {
        constexpr int windowsize = 30;
        MatrixType df(features.rows() - windowsize, features.cols());
        const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
        RowVectorType mean_vector, stdev_vector;
        for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
            const auto& block = features.middleRows(i - windowsize, windowsize);
            mean_vector = block.colwise().mean();
            stdev_vector = (block.rowwise() - mean_vector)
                    .colwise().norm() * stdfactor;
            df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
                    .array() / stdev_vector.array();
        }
        return df;
    }
    

    This gives another 14% speedup.

    Algorithmic improvements

    Your algorithm depends on a rolling mean and standard deviation. If you have a mean value over the range x[i, i + windowsize), you can calculate the mean for the range x[i + 1, i + windowsize + 1) simply by subtracting x[i] / windowsize and adding the value x[i + windowsize] / windowsize. And now you just reduced the number of additions from 30 to 2.

    For computing the variance, there is an entire Wikipedia article with sample code. Plus, a StackOverflow answer.

    Adapting these, we get:

    MatrixType ComputeZscore(const MatrixType& features) {
        constexpr int windowsize = 30;
        const Eigen::Index out_rows = features.rows() - windowsize;
        if(out_rows < 1)
            return MatrixType(0, features.cols());
        MatrixType df(out_rows, features.cols());
        const float mean_factor = 1.f / windowsize;
        const float var_factor = 1.f / (windowsize - 1);
        RowVectorType mean[2], var_sum;
        { // initialize state with first row
            const auto& block = features.topRows(windowsize);
            mean[0] = block.colwise().sum() * mean_factor;
            // actually variance
            var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm() * var_factor;
            const auto& sample_row = features.row(windowsize - 1);
            df.row(0) = (sample_row - mean[0]).array() / var_sum.array().sqrt();
            var_sum *= float(windowsize - 1); // variance to var_sum
        }
        for(Eigen::Index i = 1; i < out_rows; ++i) {
            RowVectorType& old_mean = mean[i & 1 ^ 1];
            RowVectorType& new_mean = mean[i & 1];
            const auto& new_x = features.row(windowsize + i - 1);
            const auto& old_x = features.row(i - 1);
            new_mean = old_mean + mean_factor * (new_x - old_x);
            var_sum = var_sum.array()
                    + (new_x - old_mean).array() * (new_x - new_mean).array()
                    - (old_x - old_mean).array() * (old_x - new_mean).array();
            const auto& sample_row = new_x;
            df.row(i) = (sample_row - new_mean).array() / (
                    var_sum * var_factor /*->variance*/).array().sqrt();
        }
        return df;
    }
    

    In my tests, this runs more than 10 times faster than the previous version.

    Parallelization

    Parallelizing now is a bit more tricky than originally since our primary loop now always depends on the result of the previous iteration. If we had a sufficienly large number of columns, we could simply let each thread handle one column or a small segment of columns. However, your sample has only 3. Instead, we can let each thread process a block of output rows.

    #include <omp.h>
    // using omp_get_num_threads, omp_get_thread_num
    
    void computeZscoreSegment(const MatrixType& features, MatrixType& df,
            Eigen::Index first_row, Eigen::Index out_rows) {
        if(! out_rows)
            return;
        constexpr int windowsize = 30;
        const float mean_factor = 1.f / windowsize;
        const float var_factor = 1.f / (windowsize - 1);
        RowVectorType mean[2], var_sum;
        {
            const auto& block = features.middleRows(first_row, windowsize);
            mean[0] = block.colwise().sum() * mean_factor;
            // actually variance
            var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm()
                    * var_factor;
            const auto& sample_row = features.row(first_row + windowsize - 1);
            df.row(first_row) = (sample_row - mean[0]).array()
                    / var_sum.array().sqrt();
            var_sum *= float(windowsize - 1); // variance to var_sum
        }
        for(Eigen::Index i = 1; i < out_rows; ++i) {
            RowVectorType& old_mean = mean[i & 1 ^ 1];
            RowVectorType& new_mean = mean[i & 1];
            const auto& new_x = features.row(first_row + windowsize + i - 1);
            const auto& old_x = features.row(first_row + i - 1);
            new_mean = old_mean + mean_factor * (new_x - old_x);
            var_sum = var_sum.array()
                    + (new_x - old_mean).array() * (new_x - new_mean).array()
                    - (old_x - old_mean).array() * (old_x - new_mean).array();
            const auto& sample_row = new_x;
            df.row(first_row + i) = (sample_row - new_mean).array() / (
                    var_sum * var_factor /*->variance*/).array().sqrt();
        }
    }
    MatrixType ComputeZscore(const MatrixType& features) {
        constexpr int windowsize = 30;
        const Eigen::Index out_rows = features.rows() - windowsize;
        if(out_rows < 1)
            return MatrixType(0, features.cols());
        MatrixType df(out_rows, features.cols());
    #   pragma omp parallel
        {
            // distribute rows evenly across threads
            const int threads = omp_get_num_threads();
            const int thread_num = omp_get_thread_num();
            Eigen::Index thread_rows = out_rows / threads;
            const int distr_modulo = out_rows % threads;
            const Eigen::Index own_start = thread_rows * thread_num
                    + std::min(thread_num, distr_modulo);
            thread_rows += thread_num < distr_modulo;
            computeZscoreSegment(features, df, own_start, thread_rows);
        }
        return df;
    }
    

    On my system I get another 4 times speedup for 6 cores / 12 threads but this will very much depend on your hardware. I expect it to become memory-bound relatively quickly.

    Used compile flags for all: -O3 -march=native -fopenmp -DNDEBUG -fno-math-errno -fno-trapping-math

    Transposed in-/output

    Consider transposing the input / output matrix. Since Eigen's storage is column-major by default, matrix.col(i) is more efficient than matrix.row(i). Right now with only 3 columns, this doesn't seem to matter at all. However, if you increase the number of columns, it will become significant.

    In particular, going from 3 to 4 columns and transposing makes the code almost 60% faster in my tests. This is because then all computations happen on vectors are match 1-to-1 the size of vector registers on the CPU (4 floats per XMM register, 4 doubles per YMM register if you go to higher precision). Even if you don't use that fourth entry and simply pad the input to the next multiple of 4, it will be worth it.