rdplyrsparklyr

Writing a function to use with spark_apply() from sparklyr


test <- data.frame('prod_id'= c("shoe", "shoe", "shoe", "shoe", "shoe", "shoe", "boat", "boat","boat","boat","boat","boat"), 
               'seller_id'= c("a", "b", "c", "d", "e", "f", "a","g", "h", "r", "q", "b"), 
               'Dich'= c(1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0),
               'price' = c(120, 20, 10, 4, 3, 4, 30, 43, 56, 88, 75, 44)
                )
test

       prod_id seller_id Dich price
 1     shoe         a    1   120
 2     shoe         b    0    20
 3     shoe         c    0    10
 4     shoe         d    0     4
 5     shoe         e    0     3
 6     shoe         f    0     4
 7     boat         a    0    30
 8     boat         g    0    43
 9     boat         h    1    56
10     boat         r    0    88
11     boat         q    0    75
12     boat         b    0    44

I wanted to create a new column that takes the difference between observations in the price column based on the value of Dich where each observation takes its difference from the observation where Dich==1 within each prod_id group. The syntax for doing that is below.

test %>% 
group_by(prod_id) %>% 
mutate(diff_p = if(any(Dich ==1)) price - price[Dich == 1] else NA)

       prod_id seller_id Dich price diff_p
 1     shoe         a    1   120      0
 2     shoe         b    0    20     -100
 3     shoe         c    0    10     -110
 4     shoe         d    0     4     -116
 5     shoe         e    0     3     -117
 6     shoe         f    0     4     -116
 7     boat         a    0    30     -26
 8     boat         g    0    43     -13
 9     boat         h    1    56       0
10     boat         r    0    88      32
11     boat         q    0    75      19
12     boat         b    0    44     -12

Now I would like to create a function that uses the same syntax where I can use the function on a new dataframe and get the same results with sparklyr::spark_apply().

trans <- function(e) {e %>%
         group_by(prod_id) %>% 
         mutate(diff_p = if(any(Dich ==1)) price -price[Dich == 1] else NA)
         }

On their website, rstudio discusses the use of applying R functions to spark objects.

https://spark.rstudio.com/guides/distributed-r/

Here is an example of a function that scales all of the columns of a spark dataframe.

 trees_tbl %>%
 spark_apply(function(e) scale(e))

I'm wondering how I might write the function above in the format explained for use with spark_apply(). It would be helpful if you could explain how to include e in a function, - what does e stand in for?


Solution

  • All the packages need to be in the worker and functions need to be found (but %>% needs you to tell the worker library(magrittr)), one way that can work is:

    trans <- function(e) {
        library(magrittr)
    
        e %>%
            dplyr::group_by(prod_id) %>% 
            dplyr::mutate(diff_p = if(any(Dich ==1)) price -price[Dich == 1] else NA)
    }
    
    sparklyr::spark_apply(
      x = test_sf, 
      f = trans)
    # Source: spark<?> [?? x 5]
       prod_id seller_id  Dich price diff_p
       <chr>   <chr>     <dbl> <dbl>  <dbl>
     1 shoe    a             1   120      0
     2 shoe    b             0    20   -100
     3 shoe    c             0    10   -110
     4 shoe    d             0     4   -116
     5 shoe    e             0     3   -117
     6 shoe    f             0     4   -116
     7 boat    a             0    30    -26
     8 boat    g             0    43    -13
     9 boat    h             1    56      0
    10 boat    r             0    88     32
    # … with more rows
    # ℹ Use `print(n = ...)` to see more rows