rdplyrmultidplyr

multidplyr and group_by () and filter()


I have the following dataframe and my intention is to find all the IDs, that have different USAGE but the same TYPE.

ID <- rep(1:4, each=3)
USAGE <- c("private","private","private","private",
"taxi","private","taxi","taxi","taxi","taxi","private","taxi")
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW")
df <- data.frame(ID,USAGE,TYPE)

If I run

df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)

I get the intended result. But my original dataframe has >2 mln rows. So I would like to use all my cores in running this operation.

I tried this code with multidplyr:

f1 <- partition(df, ID)
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
f3 <- collect(f2)

But then the following message appears:

Warning message: group_indices_.grouped_df ignores extra arguments

after

f1 <- partition(df, ID)

and

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  4 nodes produced errors; first error: Evaluation error: object 'f1' not found.

after

f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1)

What would be the correct way to implement the whole operation into multidplyr? Thanks a lot.


Solution

  • You should include all grouping variables in your call to partition(). That way each core has all of the data needed to perform a calculation for a given group.

    library(tidyverse)
    library(multidplyr)
    
    fast <- df %>%
      partition(ID, TYPE) %>%
      group_by(ID, TYPE) %>%
      filter(n_distinct(USAGE) > 1) %>%
      collect()
    

    Verification

    You'll still get the warning about group_indices, but the results are the same as the original dplyr method.

    slow <- df %>%
      group_by(ID, TYPE) %>%
      filter(n_distinct(USAGE) > 1)
    
    fast == slow
           ID USAGE TYPE
    #[1,] TRUE  TRUE TRUE
    #[2,] TRUE  TRUE TRUE
    #[3,] TRUE  TRUE TRUE
    

    Benchmarking

    Now the big question: is it faster? Defining cluster lets us ensure that we're using all cores.

    library(microbenchmark)
    library(parallel)
    
    cluster <- create_cluster(cores = detectCores())
    
    fast_func <- function(df) {
      df %>%
        partition(ID, TYPE, cluster = cluster) %>%
        group_by(ID, TYPE) %>%
        filter(n_distinct(USAGE) > 1) %>%
        collect()
    }
    
    slow_func <- function(df) {
      slow <- df %>%
        group_by(ID, TYPE) %>%
        filter(n_distinct(USAGE) > 1)
    }
    
    microbenchmark(fast_func(df), slow_func(df))
    # Unit: milliseconds
    # expr       min        lq      mean    median        uq       max neval cld
    # fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045   100   b
    # slow_func(df)  4.717761  6.974897  9.333049  7.796686  8.468594  49.51916   100  a 
    

    Using parallel processing is actually slower in this case. The median run for fast_func takes 56 milliseconds instead of 9. That's because of the overhead associated with managing the flow of data across clusters. But you said your data has millions of rows, so let's try that.

    # Embiggen the data
    df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df()
    
    microbenchmark(fast_func(df), slow_func(df))
    # Unit: seconds
    # expr       min        lq      mean    median        uq       max neval cld
    # fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095    10   b
    # slow_func(df)  1.741674  2.550008  3.529607  3.246665  3.983452  7.214484    10  a 
    

    With the giant dataset, fast_func is still slower! There are times where running in parallel will save enormous amounts of time, but a simple grouped filter is not necessarily one of them.