I'm encountering an issue while trying to count the number of distinct values in a Spark DataFrame column based on a condition using sparklyr. Here's the code I'm using:
library(sparklyr)
library(dplyr)
####df
df <- data.frame(
appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
appl_y = c("y", "n", "y", "n", "y", "n", "y"),
manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
alternate_flag = c("y", "n", "y", "y", "n", "y", "n")
)
# Connect to Spark
sc <- spark_connect(master = "local")
# Create the Spark DataFrame
df_spark <- copy_to(sc, df, "df_spark")
# Group by 'manu' and summarize
result <- df_spark %>%
group_by(manu) %>%
summarize(num_appl_y = n_distinct(appl[appl_y == 'y']),
num_appl_flag=n_distinct(appl[alternate_flag == 'y'])
)
Show the result
collect(result)
The intention is to group the data by the manu column and then count the number of distinct values in the appl column where the corresponding appl_y and alternate_flag column is 'y' within each group. However, this doesn't work, the count is off when I do it this way in sparklyr.
There's this open issue - n_distinct()
translation issue #3253
Currently query gets translated to:
SELECT
`manu`,
COUNT(DISTINCT(ARRAY(CASE WHEN (`appl_y` = "y") THEN (`appl`) END))) AS `num_appl_y`,
COUNT(DISTINCT(ARRAY(CASE WHEN (`alternate_flag` = "y") THEN (`appl`) END))) AS `num_appl_flag`
FROM `df_spark`
GROUP BY `manu`
, and NA
/ empty values in those arrays mess up counts.
Maybe I'm missing something, but what about counting "y"
values with sum(appl_y == "y")
& sum(alternate_flag == "y")
?
library(sparklyr)
library(dplyr)
df <- data.frame(
appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
appl_y = c("y", "n", "y", "n", "y", "n", "y"),
manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
alternate_flag = c("y", "n", "y", "y", "n", "y", "n")
)
# Connect to Spark (2.4.3)
sc <- spark_connect(master = "local")
# Create the Spark DataFrame
df_spark <- copy_to(sc, df, "df_spark")
# Spark summary
result <- df_spark %>%
group_by(manu) %>%
summarise(
# sparkly / Spark need a little help with boolean to numeric casting
num_appl_y = sum(as.integer(appl_y == "y")),
num_appl_flag = sum(as.integer(alternate_flag == "y"))
)
# Show the result
collect(result)
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> # A tibble: 3 × 3
#> manu num_appl_y num_appl_flag
#> <chr> <dbl> <dbl>
#> 1 South Korea 0 1
#> 2 China 0 1
#> 3 USA 4 2
show_query(result)
#> <SQL>
#> SELECT
#> `manu`,
#> SUM(CAST(`appl_y` = "y" AS INT)) AS `num_appl_y`,
#> SUM(CAST(`alternate_flag` = "y" AS INT)) AS `num_appl_flag`
#> FROM `df_spark`
#> GROUP BY `manu`
dplyr
on df
for reference:
# reference summary
df %>%
group_by(manu) %>%
summarize(num_appl_y = n_distinct(appl[appl_y == 'y']),
num_appl_flag=n_distinct(appl[alternate_flag == 'y'])
)
#> # A tibble: 3 × 3
#> manu num_appl_y num_appl_flag
#> <chr> <int> <int>
#> 1 China 0 1
#> 2 South Korea 0 1
#> 3 USA 4 2