I keep progressing in the world of SparkR
, and I am now facing an issue which I am unable to resolve.
Working on SparkDataFrame manipulations, I may want to update some columns, or aggregate others. I learned how to do it on a case-by-case basis, i.e. column by column.
Let's take an example :
library(SparkR)
library(magrittr)
# Creating SDF
nb.row <- 10
nb.col <- 10
m <- matrix(runif(n=nb.row*nb.col, min = 0, max = 100), nb.row, nb.col)
sdf <- createDataFrame(data.frame(ID = 1:10, CODE = base::sample(LETTERS[1:2]), V = m))
sdf <- withColumn(sdf, "V_1", sdf$V_1 * 1000)
sdf <- withColumn(sdf, "V_2", sdf$V_2 * 1000)
agg1 <- agg(groupBy(sdf, "CODE"), "SV_6" = sum(sdf$V_6), "SV_7" = sum(sdf$V_7))
My question is : how to deal with these cases when I don't know the list of columns I want to work on ? (Easy on R basic, this seems insurmountable to me in SparkR...)
list.var.1 <- paste0("V_", 1:5)
for (i in 1:length(list.var.1)) {
sdf <- withColumn(sdf, list.var.1[i], sdf[[list.var.1[i]]] * 1000)
}
This gives me the expected result, but is it the simplest script ? Nothing lighter or more "official" ?
# Useful functions
DFjoin <- function(left_df, right_df, key = "key", join_type = "left"){
left_df <- withColumnRenamed(left_df, key, "left_key")
right_df <- withColumnRenamed(right_df, key, "right_key")
result <- join(
left_df, right_df,
left_df$left_key == right_df$right_key,
joinType = join_type)
result <- withColumnRenamed(result, "left_key", key)
result$right_key <- NULL
return(result)
}
sum_spark <- function(res, df, gb, col) {
Cols <- paste0('S', col)
tmp <- agg(groupBy(df, gb), alias(sum(df[[col]]), Cols))
result <- DFjoin(res, tmp, "CODE")
}
# First step to create base SDF called res
res <- SparkR::select(sdf, sdf$CODE) %>% SparkR::distinct()
# Updating res in a for loop with join
for (i in 1:length(list.var.2)){
res <- sum_spark(res, sdf, "CODE", list.var.2[i])
}
This also gives me the expected result, but the script really seems heavy (according to me, versus R basic). Am I wrong ?
I can't find any more information on this. So everything helps !!
You may refer to this answer on how to use lapply in conjunction with other SparkR functions to get what you desire instead of using for loops
.
Sharing one useful function for using SparkR::agg
on a list of columns below that will serve your purpose:
#' Apply SparkR aggregate function on list of columns
#'
#' This function acts as a boilerplate for simplifying the code to do
#' aggregation on multiple columns as a list and apply Spark::agg function on
#' that.
#'
#' @param spark_df Spark dataframe (Grouped or ususal) on which some SparkR
#' aggregate function to be applied
#' @param agg_cols_list List of Spark column object having some aggregate
#' function
#'
#' @examples \dontrun{
#' # sdf is a SparkR dataframe having numeric columns "a" & "b"
#' sdf <- SparkR::createDataFrame(data.frame(a = c(1, 2), b = c(1, 5)))
#' sparkr_agg_listargs(sdf,
#' lapply(c("a", "b"), function(x) sum(SparkR::column(x)))
#' )
#' }
sparkr_agg_listargs <- function(spark_df, agg_cols_list) {
do.call(SparkR::agg, c(spark_df, agg_cols_list))
}
Please use SparkR::alias
effectively to get desired names of new columns.