rbigdataapache-arrow

Joining Arrow tables in R without overflowing memory or exceeding Acero's "bytes of key data" limit


I'm working with big data, using R and Apache Arrow. My data is split between two datasets, call them:

To aggregate vals down to a manageable size I need to first join with meta. I believe that doing this in a memory-efficient, piecewise manner is exactly what Arrow excels at, but I can't make it happen.

Here's my attempt:

library(arrow)
library(dplyr)
vals = open_dataset(path_to_vals)
meta = open_dataset(path_to_meta)
temp = left_join(vals, meta, by='id') %>% 
  group_by(grouping_variables_from_meta) %>% 
  summarise(across(all_of(variables_from_vals), mean))
write_dataset(temp, 'some_path')

As soon as I invoke write_dataset my memory usage balloons. It seems to me that Arrow is trying to do the join by holding both tables in memory. Even if I temporarily sidestep this problem by requesting absurd amounts of memory from the cluster (not a long-term solution), the process eventually fails with:

Error: Invalid: There are more than 2^32 bytes of key data. Acero cannot process a join of this magnitude.

I have used Arrow to summarise much larger datasets in the past with a) very little memory usage and b) no errors about too large key data. But, on those occasions I always had the grouping variables in a single dataset and didn't need to join them first. So, I'm a pretty sure the join step is where the problem lies.

Should I be doing this differently? Is there something I've misunderstood?


sessionInfo():

R version 4.3.1 (2023-06-16)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Rocky Linux 8.7 (Green Obsidian)

Matrix products: default
BLAS:   /n/sw/helmod-rocky8/apps/Core/R/4.3.1-fasrc01/lib64/R/lib/libRblas.so 
LAPACK: /n/sw/helmod-rocky8/apps/Core/R/4.3.1-fasrc01/lib64/R/lib/libRlapack.so;  LAPACK version 3.11.0

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

time zone: America/New_York
tzcode source: system (glibc)

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_1.1.3     arrow_13.0.0.1  nvimcom_0.9-147

loaded via a namespace (and not attached):
 [1] assertthat_0.2.1 utf8_1.2.3       R6_2.5.1         bit_4.0.5       
 [5] tidyselect_1.2.0 magrittr_2.0.3   glue_1.6.2       tibble_3.2.1    
 [9] pkgconfig_2.0.3  bit64_4.0.5      generics_0.1.3   lifecycle_1.0.3 
[13] cli_3.6.1        fansi_1.0.4      vctrs_0.6.3      compiler_4.3.1  
[17] purrr_1.0.2      tools_4.3.1      pillar_1.9.0     rlang_1.1.1   

Solution

  • Due to the way acero (the C++ execution engine that is used for the joins) stores the key data (uint32_t) joins with more than 4gb of key data are not supported at the moment.

    The error message you are seeing is caused by a check to prevent data from being clobbered and silently returning broken results. But it seems there are some issues with that check that are being worked on in https://github.com/apache/arrow/pull/37709 which could mean you are affected even with <4gb of key data.