rsummarizesparklyr

Issue with using n_distinct in sparklyr to count distinct values based on condition


I'm encountering an issue while trying to count the number of distinct values in a Spark DataFrame column based on a condition using sparklyr. Here's the code I'm using:

library(sparklyr)
library(dplyr)

####df
df <- data.frame(
  appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
  appl_y = c("y", "n", "y", "n", "y", "n", "y"),
  manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
  alternate_flag = c("y", "n", "y", "y", "n", "y", "n")
)

# Connect to Spark
sc <- spark_connect(master = "local")

# Create the Spark DataFrame
df_spark <- copy_to(sc, df, "df_spark")

# Group by 'manu' and summarize
result <- df_spark %>%
  group_by(manu) %>%
  summarize(num_appl_y = n_distinct(appl[appl_y == 'y']), 
num_appl_flag=n_distinct(appl[alternate_flag == 'y'])
)

Show the result
collect(result)

The intention is to group the data by the manu column and then count the number of distinct values in the appl column where the corresponding appl_y and alternate_flag column is 'y' within each group. However, this doesn't work, the count is off when I do it this way in sparklyr.


Solution

  • There's this open issue - n_distinct() translation issue #3253
    Currently query gets translated to:

    SELECT
      `manu`,
      COUNT(DISTINCT(ARRAY(CASE WHEN (`appl_y` = "y") THEN (`appl`) END))) AS `num_appl_y`,
      COUNT(DISTINCT(ARRAY(CASE WHEN (`alternate_flag` = "y") THEN (`appl`) END))) AS `num_appl_flag`
    FROM `df_spark`
    GROUP BY `manu`
    

    , and NA / empty values in those arrays mess up counts.

    Maybe I'm missing something, but what about counting "y" values with sum(appl_y == "y") & sum(alternate_flag == "y") ?

    library(sparklyr)
    library(dplyr)
    
    df <- data.frame(
      appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
      appl_y = c("y", "n", "y", "n", "y", "n", "y"),
      manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
      alternate_flag = c("y", "n", "y", "y", "n", "y", "n")
    )
    
    # Connect to Spark (2.4.3)
    sc <- spark_connect(master = "local")
    
    # Create the Spark DataFrame
    df_spark <- copy_to(sc, df, "df_spark")
    
    # Spark summary
    result <- df_spark %>% 
      group_by(manu) %>% 
      summarise(
        # sparkly / Spark need a little help with boolean to numeric casting
        num_appl_y = sum(as.integer(appl_y == "y")),
        num_appl_flag = sum(as.integer(alternate_flag == "y"))
      )
    # Show the result
    collect(result)
    #> Warning: Missing values are always removed in SQL aggregation functions.
    #> Use `na.rm = TRUE` to silence this warning
    #> This warning is displayed once every 8 hours.
    #> # A tibble: 3 × 3
    #>   manu        num_appl_y num_appl_flag
    #>   <chr>            <dbl>         <dbl>
    #> 1 South Korea          0             1
    #> 2 China                0             1
    #> 3 USA                  4             2
    
    show_query(result)
    #> <SQL>
    #> SELECT
    #>   `manu`,
    #>   SUM(CAST(`appl_y` = "y" AS INT)) AS `num_appl_y`,
    #>   SUM(CAST(`alternate_flag` = "y" AS INT)) AS `num_appl_flag`
    #> FROM `df_spark`
    #> GROUP BY `manu`
    
    

    dplyr on df for reference:

    # reference summary
    df %>%
      group_by(manu) %>%
      summarize(num_appl_y = n_distinct(appl[appl_y == 'y']), 
                num_appl_flag=n_distinct(appl[alternate_flag == 'y'])
      )
    #> # A tibble: 3 × 3
    #>   manu        num_appl_y num_appl_flag
    #>   <chr>            <int>         <int>
    #> 1 China                0             1
    #> 2 South Korea          0             1
    #> 3 USA                  4             2