I have a dataframe with 100 million rows and ~ 10,000 columns. The columns are of two types, standard (C_i) followed by dynamic (X_i). This dataframe was obtained after some processing, and the performance was fast. Now only 2 steps remain:
Goal:
FloatType
.Difficulty:
Minimal code:
import spark.implicits._
import org.apache.spark.sql.types.FloatType
// sample_udf
val foo = (s_val: String, t_val: String) => {
t_val + s_val.takeRight(1)
}
val foos_udf = udf(foo)
spark.udf.register("foos_udf", foo)
val columns = Seq("C1", "C2", "X1", "X2", "X3", "X4")
val data = Seq(("abc", "212", "1", "2", "3", "4"),("def", "436", "2", "2", "1", "8"),("abc", "510", "1", "2", "5", "8"))
val rdd = spark.sparkContext.parallelize(data)
var df = spark.createDataFrame(rdd).toDF(columns:_*)
df.show()
for (cols <- df.columns.drop(2)) {
df = df.withColumn(cols, foos_udf(col("C2"),col(cols)))
}
df.show()
for (cols <- df.columns.drop(2)) {
df = df.withColumn(cols,col(cols).cast(FloatType))
}
df.show()
Error on 1,500 column data:
Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.isStreaming(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
at scala.collection.immutable.List.exists(List.scala:84)
...
Thoughts:
var
could be replaced, but the size of the data is close to 40% of the RAM.for
loop for dtype
casting could be causing degradation of performance, though I can't see how, and what are the alternatives. From searching on internet, I have seen people suggesting foldLeft
based approach, but that apparently still gets translated to for
loop internally.Any inputs on this would be greatly appreciated.
A faster solution was to call UDF on row itself rather than calling on each column. As Spark stores data as rows, the earlier approach was exhibiting terrible performance.
def my_udf(names: Array[String]) = udf[String,Row]((r: Row) => {
val row = Array.ofDim[String](names.length)
for (i <- 0 until row.length) {
row(i) = r.getAs(i)
}
...
}
...
val df2 = df1.withColumn(results_col,my_udf(df1.columns)(struct("*"))).select(col(results_col))
Type casting can be done as suggested by Riccardo