I have the following dataframe and my intention is to find all the IDs, that have different USAGE but the same TYPE.
ID <- rep(1:4, each=3)
USAGE <- c("private","private","private","private",
"taxi","private","taxi","taxi","taxi","taxi","private","taxi")
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW")
df <- data.frame(ID,USAGE,TYPE)
If I run
df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
I get the intended result. But my original dataframe has >2 mln rows. So I would like to use all my cores in running this operation.
I tried this code with multidplyr:
f1 <- partition(df, ID)
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
f3 <- collect(f2)
But then the following message appears:
Warning message: group_indices_.grouped_df ignores extra arguments
after
f1 <- partition(df, ID)
and
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
4 nodes produced errors; first error: Evaluation error: object 'f1' not found.
after
f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1)
What would be the correct way to implement the whole operation into multidplyr? Thanks a lot.
You should include all grouping variables in your call to partition()
. That way each core has all of the data needed to perform a calculation for a given group.
library(tidyverse)
library(multidplyr)
fast <- df %>%
partition(ID, TYPE) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
You'll still get the warning about group_indices, but the results are the same as the original dplyr
method.
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
fast == slow
ID USAGE TYPE
#[1,] TRUE TRUE TRUE
#[2,] TRUE TRUE TRUE
#[3,] TRUE TRUE TRUE
Now the big question: is it faster? Defining cluster
lets us ensure that we're using all cores.
library(microbenchmark)
library(parallel)
cluster <- create_cluster(cores = detectCores())
fast_func <- function(df) {
df %>%
partition(ID, TYPE, cluster = cluster) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
}
slow_func <- function(df) {
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
}
microbenchmark(fast_func(df), slow_func(df))
# Unit: milliseconds
# expr min lq mean median uq max neval cld
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a
Using parallel processing is actually slower in this case. The median run for fast_func
takes 56 milliseconds instead of 9. That's because of the overhead associated with managing the flow of data across clusters. But you said your data has millions of rows, so let's try that.
# Embiggen the data
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df()
microbenchmark(fast_func(df), slow_func(df))
# Unit: seconds
# expr min lq mean median uq max neval cld
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a
With the giant dataset, fast_func
is still slower! There are times where running in parallel will save enormous amounts of time, but a simple grouped filter is not necessarily one of them.