pythonc++rdata.tablepython-polars

Efficient rolling, non-equi joins


Looking for the current most efficient approach in either R, python or c++ (with Rcpp).

Taking an example with financial data,

df
                   time      bid      ask                time_msc flags wdayLab  wday rowid
                 <POSc>    <num>    <num>                  <POSc> <int>   <ord> <num> <int>
 1: 2025-01-02 04:00:00 21036.48 21043.08 2025-01-02 04:00:00.888   134     Thu     5     1
 2: 2025-01-02 04:00:00 21037.54 21043.27 2025-01-02 04:00:00.888   134     Thu     5     2
 3: 2025-01-02 04:00:00 21036.52 21042.55 2025-01-02 04:00:00.888   134     Thu     5     3
 4: 2025-01-02 04:00:00 21036.82 21041.75 2025-01-02 04:00:00.888   134     Thu     5     4
 5: 2025-01-02 04:00:00 21036.79 21040.78 2025-01-02 04:00:00.891   134     Thu     5     5
 6: 2025-01-02 04:00:00 21035.86 21039.95 2025-01-02 04:00:00.891   134     Thu     5     6
 7: 2025-01-02 04:00:00 21036.05 21038.76 2025-01-02 04:00:00.891   134     Thu     5     7
 8: 2025-01-02 04:00:00 21034.74 21038.33 2025-01-02 04:00:00.891   134     Thu     5     8
 9: 2025-01-02 04:00:00 21034.72 21039.35 2025-01-02 04:00:00.892   134     Thu     5     9
10: 2025-01-02 04:00:00 21034.99 21038.08 2025-01-02 04:00:00.892   134     Thu     5    10

I want, for each rowid, the most recent rowid in the past where the ask was higher. My real data has 29,871,567 rows (can share if needed). The solution doesn't need to be a join as long as the last higher rowid is retrieved.

R data.table

I usually solve this using R's data.table joins:

library(data.table)
setDTthreads(detectCores() - 2) # no effect
df_joined <- df[,.(rowid, ask, time_msc)][,rowid_prevHi:=rowid][,ask_prevHi := ask][
                df,
                 on = .(rowid < rowid, ask >= ask), 
                 mult = "last",     # Take the closest (most recent) match
                 # by = .EACHI,       # Do it row-by-row
                 nomatch = NA,      # Allow NA if no such row exists
                 #.(i.rowid, last_higher_row = x.rowid, last_higher = x.time, lastHigh = x.ask)
][, difference_from_previous_higher := ask_prevHi - ask]

This works on smaller datasets because both multiple inequalities and the rolling condition mult = "last" are supported. However, it is single-threaded and my rig doesn't manage the full dataset.

Expected result is below, and I expect the difference_from_previous_higher to be always positive and the rowid_prevHi always smaller than rowid.

    rowid      ask                time_msc rowid_prevHi ask_prevHi                time      bid              i.time_msc flags
    <int>    <num>                  <POSc>        <int>      <num>              <POSc>    <num>                  <POSc> <int>
 1:     1 21043.08                    <NA>           NA         NA 2025-01-02 04:00:00 21036.48 2025-01-02 04:00:00.888   134
 2:     2 21043.27                    <NA>           NA         NA 2025-01-02 04:00:00 21037.54 2025-01-02 04:00:00.888   134
 3:     3 21042.55 2025-01-02 04:00:00.888            2   21043.27 2025-01-02 04:00:00 21036.52 2025-01-02 04:00:00.888   134
 4:     4 21041.75 2025-01-02 04:00:00.888            3   21042.55 2025-01-02 04:00:00 21036.82 2025-01-02 04:00:00.888   134
 5:     5 21040.78 2025-01-02 04:00:00.888            4   21041.75 2025-01-02 04:00:00 21036.79 2025-01-02 04:00:00.891   134
 6:     6 21039.95 2025-01-02 04:00:00.891            5   21040.78 2025-01-02 04:00:00 21035.86 2025-01-02 04:00:00.891   134
 7:     7 21038.76 2025-01-02 04:00:00.891            6   21039.95 2025-01-02 04:00:00 21036.05 2025-01-02 04:00:00.891   134
 8:     8 21038.33 2025-01-02 04:00:00.891            7   21038.76 2025-01-02 04:00:00 21034.74 2025-01-02 04:00:00.891   134
 9:     9 21039.35 2025-01-02 04:00:00.891            6   21039.95 2025-01-02 04:00:00 21034.72 2025-01-02 04:00:00.892   134
10:    10 21038.08 2025-01-02 04:00:00.892            9   21039.35 2025-01-02 04:00:00 21034.99 2025-01-02 04:00:00.892   134
    wdayLab  wday difference_from_previous_higher
      <ord> <num>                           <num>
 1:     Thu     5                              NA
 2:     Thu     5                              NA
 3:     Thu     5                            0.72
 4:     Thu     5                            0.80
 5:     Thu     5                            0.97
 6:     Thu     5                            0.83
 7:     Thu     5                            1.19
 8:     Thu     5                            0.43
 9:     Thu     5                            0.60
10:     Thu     5                            1.27

polars

I've tried a polars implementation in python, but although join_asof is multiprocessed, fast and supports the backwards strategy it doesn't support specifying other inequalities while joining, only filtering after the join which is not useful.

joined = df.join_asof(
    df.select(['rowid', 'time_msc', 'ask']).with_columns([
        pl.col('time_msc').alias('time_prevhi')
    ]),
    on="time_msc",
    strategy="backward",
    suffix="_prevhi",
    allow_exact_matches=False
).with_columns([
  (pl.col('rowid')-pl.col('rowid_prevhi')).alias('ticksdiff_prevhi'),
  (pl.col('ask')-pl.col('ask_prevhi')).alias('askdiff_prevhi'),
])

I'm not even sure how the matches are chosen, but of course ask is not always smaller than ask_prevHi since I couldn't mention it.

shape: (10, 13)
┌─────┬─────┬─────┬───────┬────────┬──────┬───────┬────────┬───────┬───────┬───────┬───────┬───────┐
│ bid ┆ ask ┆ tim ┆ flags ┆ wdayLa ┆ wday ┆ rowid ┆ time   ┆ rowid ┆ ask_p ┆ time_ ┆ ticks ┆ askdi │
│ --- ┆ --- ┆ e_m ┆ ---   ┆ b      ┆ ---  ┆ ---   ┆ ---    ┆ _prev ┆ revhi ┆ prevh ┆ diff_ ┆ ff_pr │
│ f64 ┆ f64 ┆ sc  ┆ i64   ┆ ---    ┆ i64  ┆ i64   ┆ dateti ┆ hi    ┆ ---   ┆ i     ┆ prevh ┆ evhi  │
│     ┆     ┆ --- ┆       ┆ str    ┆      ┆       ┆ me[μs] ┆ ---   ┆ f64   ┆ ---   ┆ i     ┆ ---   │
│     ┆     ┆ dat ┆       ┆        ┆      ┆       ┆        ┆ i64   ┆       ┆ datet ┆ ---   ┆ f64   │
│     ┆     ┆ eti ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆ ime[m ┆ i64   ┆       │
│     ┆     ┆ me[ ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆ s]    ┆       ┆       │
│     ┆     ┆ ms] ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
╞═════╪═════╪═════╪═══════╪════════╪══════╪═══════╪════════╪═══════╪═══════╪═══════╪═══════╪═══════╡
│ 210 ┆ 210 ┆ 202 ┆ 134   ┆ Thu    ┆ 5    ┆ 1     ┆ 2025-0 ┆ null  ┆ null  ┆ null  ┆ null  ┆ null  │
│ 36. ┆ 43. ┆ 5-0 ┆       ┆        ┆      ┆       ┆ 1-02   ┆       ┆       ┆       ┆       ┆       │
│ 48  ┆ 08  ┆ 1-0 ┆       ┆        ┆      ┆       ┆ 00:00: ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 2   ┆       ┆        ┆      ┆       ┆ 00     ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 00: ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 00: ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 00. ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 888 ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│ 210 ┆ 210 ┆ 202 ┆ 134   ┆ Thu    ┆ 5    ┆ 2     ┆ 2025-0 ┆ 1     ┆ 21043 ┆ 2025- ┆ 1     ┆ 0.19  │
│ 37. ┆ 43. ┆ 5-0 ┆       ┆        ┆      ┆       ┆ 1-02   ┆       ┆ .08   ┆ 01-02 ┆       ┆       │
│ 54  ┆ 27  ┆ 1-0 ┆       ┆        ┆      ┆       ┆ 00:00: ┆       ┆       ┆ 00:00 ┆       ┆       │
│     ┆     ┆ 2   ┆       ┆        ┆      ┆       ┆ 00     ┆       ┆       ┆ :00.8 ┆       ┆       │
│     ┆     ┆ 00: ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆ 88    ┆       ┆       │
│     ┆     ┆ 00: ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 00. ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│     ┆     ┆ 889 ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆       ┆       ┆       │
│ 210 ┆ 210 ┆ 202 ┆ 134   ┆ Thu    ┆ 5    ┆ 3     ┆ 2025-0 ┆ 1     ┆ 21043 ┆ 2025- ┆ 2     ┆ -0.53 │
│ 36. ┆ 42. ┆ 5-0 ┆       ┆        ┆      ┆       ┆ 1-02   ┆       ┆ .08   ┆ 01-02 ┆       ┆       │
│ 52  ┆ 55  ┆ 1-0 ┆       ┆        ┆      ┆       ┆ 00:00: ┆       ┆       ┆ 00:00 ┆       ┆       │
│     ┆     ┆ 2   ┆       ┆        ┆      ┆       ┆ 00     ┆       ┆       ┆ :00.8 ┆       ┆       │
│     ┆     ┆ 00: ┆       ┆        ┆      ┆       ┆        ┆       ┆       ┆ 88    ┆       ┆       │

I've also tried Polar's join_where which supports inequalities, but not a "nearest" constraint or strategy, and therefore explodes the number of lines quadratically, consuming all compute resources without a result.

jw = df.join_where( df.select(['rowid', 'time_msc', 'ask']),  pl.col("rowid") > pl.col("rowid_prevhi"),  pl.col("ask") > pl.col("ask_prevhi"), suffix="_prevhi",)

My next approach might be to loop over each row using an Rcpp function executed in parallel from R, which retrieves the rowid of the last previous higher ask. Or perhaps frollapply from data.table would do the trick?

Suggestions most welcome.


Solution

  • Here is a RCCP stack-based approach of the Previous Greater Element problem with O(n) time complexity. It is also described here or here. IDK how fast you want this to be, maybe Java is faster. You could also use OpenMP parallel processing for the for-loop.

    For 1 million rows it runs with a median of 19.1ms

    Code

    d <- data.frame(
      rowid = 1:10e5,
      ask = sample(1:10e5)
    )
    
    library(Rcpp)
    
    cppFunction('
      List pge(NumericVector rowid, NumericVector ask) {
        int n = rowid.size();
        NumericVector prevHigherRowid(n, NA_REAL);
        NumericVector prevHigherAsk(n, NA_REAL);
        NumericVector diff(n, NA_REAL);
        
        std::vector<int> stack;
        stack.reserve(n); 
        
        for(int i = 0; i < n; i++) {
          double currentAsk = ask[i];
          
          while(!stack.empty() && currentAsk >= ask[stack.back()]) {
            stack.pop_back();
          }
          
          if(!stack.empty()) {
            prevHigherRowid[i] = rowid[stack.back()];
            prevHigherAsk[i] = ask[stack.back()];
            diff[i] = prevHigherAsk[i] - currentAsk;
          }
          
          stack.push_back(i);
        }
        
        return List::create(
          Named("rowid_prevHi") = prevHigherRowid,
          Named("ask_prevHi") = prevHigherAsk,
          Named("difference_from_previous_higher") = diff
        );
      }
    ')
    
    pge_r <- function(d){
    
      res<- pge(d$rowid, d$ask)
      d$rowid_prevHi <- res$rowid_prevHi
      d$ask_prevHi <- res$ask_prevHi
      d$difference_from_previous_higher <- res$difference_from_previous_higher
      d
    }
     
    bench::mark(pge_r(d))