I'm working with big data, using R and Apache Arrow. My data is split between two datasets, call them:
vals
: a hive-partitioned set of parquets, each row contains an ID (long string) and hundreds of columns of data (ints)meta
: a parquet of meta-data, each row contains the same ID and some grouping variables.To aggregate vals
down to a manageable size I need to first join with meta
. I believe that doing this in a memory-efficient, piecewise manner is exactly what Arrow excels at, but I can't make it happen.
Here's my attempt:
library(arrow)
library(dplyr)
vals = open_dataset(path_to_vals)
meta = open_dataset(path_to_meta)
temp = left_join(vals, meta, by='id') %>%
group_by(grouping_variables_from_meta) %>%
summarise(across(all_of(variables_from_vals), mean))
write_dataset(temp, 'some_path')
As soon as I invoke write_dataset
my memory usage balloons. It seems to me that Arrow is trying to do the join by holding both tables in memory. Even if I temporarily sidestep this problem by requesting absurd amounts of memory from the cluster (not a long-term solution), the process eventually fails with:
Error: Invalid: There are more than 2^32 bytes of key data. Acero cannot process a join of this magnitude.
I have used Arrow to summarise much larger datasets in the past with a) very little memory usage and b) no errors about too large key data. But, on those occasions I always had the grouping variables in a single dataset and didn't need to join them first. So, I'm a pretty sure the join step is where the problem lies.
Should I be doing this differently? Is there something I've misunderstood?
sessionInfo():
R version 4.3.1 (2023-06-16)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Rocky Linux 8.7 (Green Obsidian)
Matrix products: default
BLAS: /n/sw/helmod-rocky8/apps/Core/R/4.3.1-fasrc01/lib64/R/lib/libRblas.so
LAPACK: /n/sw/helmod-rocky8/apps/Core/R/4.3.1-fasrc01/lib64/R/lib/libRlapack.so; LAPACK version 3.11.0
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=en_US.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
time zone: America/New_York
tzcode source: system (glibc)
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] dplyr_1.1.3 arrow_13.0.0.1 nvimcom_0.9-147
loaded via a namespace (and not attached):
[1] assertthat_0.2.1 utf8_1.2.3 R6_2.5.1 bit_4.0.5
[5] tidyselect_1.2.0 magrittr_2.0.3 glue_1.6.2 tibble_3.2.1
[9] pkgconfig_2.0.3 bit64_4.0.5 generics_0.1.3 lifecycle_1.0.3
[13] cli_3.6.1 fansi_1.0.4 vctrs_0.6.3 compiler_4.3.1
[17] purrr_1.0.2 tools_4.3.1 pillar_1.9.0 rlang_1.1.1
Due to the way acero (the C++ execution engine that is used for the joins) stores the key data (uint32_t) joins with more than 4gb of key data are not supported at the moment.
The error message you are seeing is caused by a check to prevent data from being clobbered and silently returning broken results. But it seems there are some issues with that check that are being worked on in https://github.com/apache/arrow/pull/37709 which could mean you are affected even with <4gb of key data.