rdplyrsparklyrdbplyr

base::cumsum() not working anymore with dplyr::mutate_at() in R Sparklyr


Issue

I have to do a cumulative sum over several columns on a grouped and ordered (big) spark dataframe. I used to do it with base::cumsum() and dplyr::mutate_at() and it worked fine. It does not work anymore with this session Info :

sessionInfo()

R version 4.4.1 (2024-06-14)
spark_version = "3.5.0"
sparklyr_1.8.4    dbplyr_2.4.0      dplyr_1.1.4

Reproducible example

library("dplyr"); library("sparklyr"); library("dbplyr")
spark_version = "3.5.0"
sc = spark_connect(master = "local", version = spark_version)

# --- copying mtcars data into spark for the example
sdf_mtcars = sdf_copy_to(sc = sc, x = mtcars, name = "sdf_mtcars", overwrite = TRUE)

sdf_mtcars %>% group_by(cyl) %>%
  window_order(disp) %>%
  mutate_at(.vars = 'mpg', .funs = cumsum) %>%
  ungroup() %>% 
  collect()

but it generates the error :

Error in `mutate()`:
ℹ In argument: `mpg = .Primitive("cumsum")(mpg)`
Caused by error in `mutate_at()`:
! object 'mpg' not found

It works fine if I use sum() over cumsum(). It alors works fine if I do :

mutate("mpg_cumulative" =  cumsum(mpg)) 

but I have to do it over several named columns inside a vector (like : c('mpg') in my example)

It alors works fine if I do :

mutate_at(.vars = 'mpg', .funs = ~ cumsum(.)) %>%

but the syntax is a bit weird ?

Thank you.


Solution

  • I'm using Databricks so the setup is a little different, but it's Spark 3.5.0.

    library("dplyr"); library("sparklyr"); library("dbplyr")
    sc <- sparklyr::spark_connect(method = "databricks")
    # just example output
    mtcars %>% arrange(disp) %>% select(mpg, disp, cyl, drat) %>% head(10)
                        mpg  disp cyl drat
    Toyota Corolla 33.9  71.1   4 4.22
    Honda Civic    30.4  75.7   4 4.93
    Fiat 128       32.4  78.7   4 4.08
    Fiat X1-9      27.3  79.0   4 4.08
    Lotus Europa   30.4  95.1   4 3.77
    Datsun 710     22.8 108.0   4 3.85
    Toyota Corona  21.5 120.1   4 3.70
    Porsche 914-2  26.0 120.3   4 4.43
    Volvo 142E     21.4 121.0   4 4.11
    Merc 230       22.8 140.8   4 3.92
    

    Now for the actual question. As recommended by @Limey, use dplyr::across.

    sdf_mtcars = sdf_copy_to(sc = sc, x = mtcars, name = "sdf_mtcars", overwrite = TRUE)
    
    sdf_mtcars %>% 
      select(mpg, disp, cyl, drat) %>% # just to reduce output
      group_by(cyl) %>%
      window_order(disp) %>%
      mutate(across(c(mpg, drat), cumsum)) %>%
      ungroup() %>% 
      collect()
    

    I also always recommend dplyr::show_query to make sure it's doing what you actually want:

    sdf_mtcars %>% 
      select(mpg, disp, cyl, drat) %>% # just to reduce output
      group_by(cyl) %>%
      window_order(disp) %>%
      mutate(across(c(mpg, drat), cumsum)) %>%
      ungroup() %>% 
      dplyr::show_query()
    
    <SQL>
    SELECT
      SUM(`mpg`) OVER (PARTITION BY `cyl` ORDER BY `disp` ROWS UNBOUNDED PRECEDING) AS `mpg`,
      `disp`,
      `cyl`,
      SUM(`drat`) OVER (PARTITION BY `cyl` ORDER BY `disp` ROWS UNBOUNDED PRECEDING) AS `drat`
    FROM `sdf_mtcars`
    

    Also, if you really want to use mutate_at, you just need to use tidyselect

    sdf_mtcars %>% group_by(cyl) %>%
      window_order(disp) %>%
      mutate_at(.vars = vars(mpg, drat), .funs = cumsum) %>%
      ungroup() %>% 
      collect()
    

    Lastly, this syntax ~ cumsum(.) is just the tidyverse style of anonymous functions, though technically it should be .funs = ~ cumsum(.x).