I have to do a cumulative sum over several columns on a grouped and ordered (big) spark dataframe. I used to do it with base::cumsum() and dplyr::mutate_at() and it worked fine. It does not work anymore with this session Info :
R version 4.4.1 (2024-06-14)
spark_version = "3.5.0"
sparklyr_1.8.4 dbplyr_2.4.0 dplyr_1.1.4
library("dplyr"); library("sparklyr"); library("dbplyr")
spark_version = "3.5.0"
sc = spark_connect(master = "local", version = spark_version)
# --- copying mtcars data into spark for the example
sdf_mtcars = sdf_copy_to(sc = sc, x = mtcars, name = "sdf_mtcars", overwrite = TRUE)
sdf_mtcars %>% group_by(cyl) %>%
window_order(disp) %>%
mutate_at(.vars = 'mpg', .funs = cumsum) %>%
ungroup() %>%
collect()
Error in `mutate()`:
ℹ In argument: `mpg = .Primitive("cumsum")(mpg)`
Caused by error in `mutate_at()`:
! object 'mpg' not found
It works fine if I use sum() over cumsum(). It alors works fine if I do :
mutate("mpg_cumulative" = cumsum(mpg))
but I have to do it over several named columns inside a vector (like : c('mpg') in my example)
It alors works fine if I do :
mutate_at(.vars = 'mpg', .funs = ~ cumsum(.)) %>%
but the syntax is a bit weird ?
Thank you.
I'm using Databricks so the setup is a little different, but it's Spark 3.5.0.
library("dplyr"); library("sparklyr"); library("dbplyr")
sc <- sparklyr::spark_connect(method = "databricks")
# just example output
mtcars %>% arrange(disp) %>% select(mpg, disp, cyl, drat) %>% head(10)
mpg disp cyl drat
Toyota Corolla 33.9 71.1 4 4.22
Honda Civic 30.4 75.7 4 4.93
Fiat 128 32.4 78.7 4 4.08
Fiat X1-9 27.3 79.0 4 4.08
Lotus Europa 30.4 95.1 4 3.77
Datsun 710 22.8 108.0 4 3.85
Toyota Corona 21.5 120.1 4 3.70
Porsche 914-2 26.0 120.3 4 4.43
Volvo 142E 21.4 121.0 4 4.11
Merc 230 22.8 140.8 4 3.92
Now for the actual question. As recommended by @Limey, use dplyr::across
.
sdf_mtcars = sdf_copy_to(sc = sc, x = mtcars, name = "sdf_mtcars", overwrite = TRUE)
sdf_mtcars %>%
select(mpg, disp, cyl, drat) %>% # just to reduce output
group_by(cyl) %>%
window_order(disp) %>%
mutate(across(c(mpg, drat), cumsum)) %>%
ungroup() %>%
collect()
I also always recommend dplyr::show_query
to make sure it's doing what you actually want:
sdf_mtcars %>%
select(mpg, disp, cyl, drat) %>% # just to reduce output
group_by(cyl) %>%
window_order(disp) %>%
mutate(across(c(mpg, drat), cumsum)) %>%
ungroup() %>%
dplyr::show_query()
<SQL>
SELECT
SUM(`mpg`) OVER (PARTITION BY `cyl` ORDER BY `disp` ROWS UNBOUNDED PRECEDING) AS `mpg`,
`disp`,
`cyl`,
SUM(`drat`) OVER (PARTITION BY `cyl` ORDER BY `disp` ROWS UNBOUNDED PRECEDING) AS `drat`
FROM `sdf_mtcars`
Also, if you really want to use mutate_at
, you just need to use tidyselect
sdf_mtcars %>% group_by(cyl) %>%
window_order(disp) %>%
mutate_at(.vars = vars(mpg, drat), .funs = cumsum) %>%
ungroup() %>%
collect()
Lastly, this syntax ~ cumsum(.)
is just the tidyverse style of anonymous functions, though technically it should be .funs = ~ cumsum(.x)
.