scalaapache-sparkapache-spark-standalone

spark scala: Performance degrade with simple UDF over large number of columns


I have a dataframe with 100 million rows and ~ 10,000 columns. The columns are of two types, standard (C_i) followed by dynamic (X_i). This dataframe was obtained after some processing, and the performance was fast. Now only 2 steps remain:

Goal:

  1. A particular operation needs to be done on every X_i using identical subset of C_i columns.
  2. Convert each of X-i column into FloatType.

Difficulty:

  1. Performance degrades terribly with increasing number of columns.
  2. After a while, only 1 executor seems to work (%CPU use < 200%), even on a sample data with 100 rows and 1,000 columns. If I push it to 1,500 columns, it crashes.

Minimal code:

import spark.implicits._
import org.apache.spark.sql.types.FloatType

// sample_udf
val foo = (s_val: String, t_val: String) => {
    t_val + s_val.takeRight(1)
}
val foos_udf = udf(foo)
spark.udf.register("foos_udf", foo)

val columns = Seq("C1", "C2", "X1", "X2", "X3", "X4")
val data = Seq(("abc", "212", "1", "2", "3", "4"),("def", "436", "2", "2", "1", "8"),("abc", "510", "1", "2", "5", "8"))

val rdd = spark.sparkContext.parallelize(data)
var df = spark.createDataFrame(rdd).toDF(columns:_*)
df.show()

for (cols <- df.columns.drop(2)) {
    df = df.withColumn(cols, foos_udf(col("C2"),col(cols)))
}
df.show()

for (cols <- df.columns.drop(2)) {
    df = df.withColumn(cols,col(cols).cast(FloatType))
}
df.show()

Error on 1,500 column data:

Exception in thread "main" java.lang.StackOverflowError
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.isStreaming(LogicalPlan.scala:37)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$isStreaming$1.apply(LogicalPlan.scala:37)
    at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
    at scala.collection.immutable.List.exists(List.scala:84)
...

Thoughts:

  1. Perhaps var could be replaced, but the size of the data is close to 40% of the RAM.
  2. Perhaps for loop for dtype casting could be causing degradation of performance, though I can't see how, and what are the alternatives. From searching on internet, I have seen people suggesting foldLeft based approach, but that apparently still gets translated to for loop internally.

Any inputs on this would be greatly appreciated.


Solution

  • A faster solution was to call UDF on row itself rather than calling on each column. As Spark stores data as rows, the earlier approach was exhibiting terrible performance.

    def my_udf(names: Array[String]) = udf[String,Row]((r: Row) => {
        val row = Array.ofDim[String](names.length)
        for (i <- 0 until row.length) {
                row(i) = r.getAs(i)
        }
        ...
    }
    ...
    val df2 = df1.withColumn(results_col,my_udf(df1.columns)(struct("*"))).select(col(results_col))
    

    Type casting can be done as suggested by Riccardo