There are a number of questions asking precisely the same thing but none within the context of a sparklyr environment. How does one group by a column and then concatenate the values of some other column as a list?
For example the following results in the desired output in a local R environment.
mtcars %>%
distinct(gear, cyl) %>%
group_by(gear) %>%
summarize(test_list = paste0(cyl, collapse = ";")) %>%
select(gear, test_list) %>%
as.data.frame() %>%
print()
gear test_list
1 3 6;8;4
2 4 6;4
3 5 4;8;6
But registering that same table to spark and using the same code errors (sql parsing error, probably it attempts to apply spark's cocollapse
function instead of R's C based collapse
function) on the mutate (see code below). I know pyspark and spark SQL have collect_set()
function that achieves the desired effect, is there something analogous for sparklyr?
sdf_copy_to(sc, x = mtcars, name = "mtcars_test")
tbl(sc, "mtcars_test") %>%
distinct(gear, cyl) %>%
group_by(gear) %>%
summarize(test_list = paste0(cyl, collapse = ";"))
Error:
Error : org.apache.spark.sql.catalyst.parser.ParseException:
In pyspark, the following approach is similar (except concatenated column is an array that can be collapsed).
from pyspark.sql.functions import collect_set
df2 = spark.table("mtcars_test")
df2.groupby("gear").agg(collect_set('cyl')).createOrReplaceTempView("mtcars_test_cont")
display(spark.table("mtcars_test_cont"))
gear collect_set(cyl)
3 [8, 4, 6]
4 [4, 6]
5 [8, 4, 6]
Instead of using R
functions, you could have used Spark SQL
syntax directly by wrapping it inside sql
function from dbplyr
. Below is an example script to get desired output:
sdf_copy_to(sc, x = mtcars, name = "mtcars_test")
tbl(sc, "mtcars_test") %>%
group_by(gear) %>%
summarize(test_list = sql("array_join(collect_set(cast(cyl as int)), ';')"))
#> gear test_list
#> <dbl> <chr>
#> 4 6;4
#> 3 6;4;8
#> 5 6;4;8
I just changed the last line of your code where you used paste0
function.
This is one reason why I prefer SparkR
more than sparklyr
, as almost all the syntax of PySpark
works in the same manner.
SparkR::agg(
SparkR::group_by(SparkR::createDataFrame(mtcars), SparkR::column("gear")),
test_list = SparkR::array_join(
SparkR::collect_set(SparkR::cast(SparkR::column("cyl"), "integer")),
";"
)
) %>%
SparkR::collect()
#> gear test_list
#> 4 6;4
#> 3 6;4;8
#> 5 6;4;8