rrcpprcppparallel

RcppParallel RVector push_back or something similar?


I am using RcppParallel to speed up some calculations. However, I am running out of memory in the process, so I would like to save results within the Parallel loop that are pass some relevance threshold. Below is a toy example to illustrate my point:

#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;

// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(cpp11)]]
struct Example : public RcppParallel::Worker {
  RcppParallel::RVector<double> xvals, xvals_output, yvals;
  Example(const NumericVector & xvals, NumericVector & yvals, NumericVector & xvals_output) : 
    xvals(xvals), xvals_output(xvals_output), yvals(yvals) {}
  void operator()(std::size_t begin, size_t end) {
    for(std::size_t i=begin; i < end; i++) {
      double y = xvals[i] * (xvals[i] - 1);
      // if(y < 0) {
      //   xvals_output.push_back(xvals[i]);
      //   yvals.push_back(y);
      // }
      xvals_output[i] = xvals[i];
      yvals[i] = y;
    }
  }
};
// [[Rcpp::export]]
List find_values(NumericVector xvals) {
  NumericVector xvals_output(xvals.size());
  NumericVector yvals(xvals.size());
  Example ex(xvals, yvals, xvals_output);
  parallelFor(0, xvals.size(), ex);
  List L = List::create(xvals_output, yvals);
  return(L);
}

The R code would be:

find_values(seq(-10,10, by=0.5))

The commented out code is what I would like to do.

That is, I would like to initialize an empty vector, and append only the y-values that pass a certain threshold and also the associated x-values.

In my real usage, I am calculating a MxN matrix, so memory is an issue.

What is the correct way to approach this issue?


Solution

  • If anyone ever comes across a similar problem, here's a solution using "concurrent_vector" from TBB (which RcppParallel uses under the hood and is available as a header).

    #include <Rcpp.h>
    #include <RcppParallel.h>
    #include <tbb/concurrent_vector.h>
    using namespace Rcpp;
    
    // [[Rcpp::depends(RcppParallel)]]
    // [[Rcpp::plugins(cpp11)]]
    struct Example : public RcppParallel::Worker {
      RcppParallel::RVector<double> xvals;
      tbb::concurrent_vector< std::pair<double, double> > &output;
      Example(const NumericVector & xvals, tbb::concurrent_vector< std::pair<double, double> > &output) : 
        xvals(xvals), output(output) {}
      void operator()(std::size_t begin, size_t end) {
        for(std::size_t i=begin; i < end; i++) {
          double y = xvals[i] * (xvals[i] - 1);
          if(y < 0) {
            output.push_back( std::pair<double, double>(xvals[i], y) );
          }
        }
      }
    };
    // [[Rcpp::export]]
    List find_values(NumericVector xvals) {
      tbb::concurrent_vector< std::pair<double, double> > output;
      Example ex(xvals,output);
      parallelFor(0, xvals.size(), ex);
      NumericVector xout(output.size());
      NumericVector yout(output.size());
      for(int i=0; i<output.size(); i++) {
        xout[i] = output[i].first;
        yout[i] = output[i].second;
      }
      List L = List::create(xout, yout);
      return(L);
    }
    

    Output:

    > find_values(seq(-10,10, by=0.5))
    [[1]]
    [1] 0.5
    
    [[2]]
    [1] -0.25