While running the following spark mllib on local mode with scala 2.12.3 , encountered the following error lambda not serialazable
Any inputs would be much appreciated ? (Moving onto scala 2.11 is not an option for me) Can you please let me know what can i do to avoid this issue? Thankyou
import java.io.FileWriter
import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType
import java.util.concurrent.atomic.AtomicBoolean
object MLAnalyzer {
val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.broadcast.compress", "false")
.setAppName("local-spark-kafka-consumer-client")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
def main(args: Array[String]): Unit = {
process
}
def process():Unit= {
// training data
val filePath = "/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/train_pooling.csv"
val modelPath = "file:///home/vagrant/Downloads/medium-articles-master/titanic_spark/training_batch/src/main/resources/poolSessionModelRecent.model"
val schema = StructType(
Array(
StructField("PACKAGE_KEY", StringType),
StructField("MOST_IDLE", IntegerType),
StructField("MAX_WAIT", IntegerType),
StructField("IDLE_COUNT", IntegerType),
StructField("APPLICATION", StringType),
StructField("LONGEST_WAIT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("LAST_ACCESS", TimestampType),
StructField("MOST_ACTIVE", IntegerType),
StructField("MAX_ACTIVE", IntegerType),
StructField("MAX_IDLE", IntegerType),
StructField("ACTIVE_COUNT", IntegerType),
StructField("FACTOR_LOAD", DoubleType)))
while (true) {
Thread.sleep(100)
// read the raw data
var df_raw = spark
.read
.option("header", "true")
// .option("inferSchema","true")
.schema(schema)
.csv(filePath)
df_raw = df_raw.drop(df_raw.col("PACKAGE_KEY"))
df_raw = df_raw.drop(df_raw.col("MOST_IDLE"))
df_raw = df_raw.drop(df_raw.col("MAX_IDLE"))
df_raw = df_raw.drop(df_raw.col("MOST_ACTIVE"))
df_raw = df_raw.drop(df_raw.col("LAST_ACCESS"))
df_raw = df_raw.drop(df_raw.col("APPLICATION"))
df_raw = df_raw.drop(df_raw.col("MAX_WAIT"))
// fill all na values with 0
val df = df_raw.na.fill(0)
val packageKeyIndexer = new StringIndexer()
.setInputCol("PACKAGE_KEY")
.setOutputCol("PackageIndex")
.setHandleInvalid("keep")
// create the feature vector
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" /*, "TOTAL_REQUEST_COUNT"*/ ))
.setOutputCol("features_intermediate")
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
var pipeline: Pipeline = null
// if (lr1 == null) {
val lr =
new LinearRegression()
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
//.setFeaturesCol("features") // setting features column
.setLabelCol("FACTOR_LOAD") // setting label column
// create the pipeline with the steps
pipeline = new Pipeline().setStages(Array( /*genderIndexer, cabinIndexer, embarkedIndexer,*/ vectorAssembler, scaler, lr))
// create the model following the pipeline steps
val cvModel = pipeline.fit(df)
// save the model
cvModel.write.overwrite.save(modelPath)
var testschema = StructType(
Array(
// StructField("PACKAGE_KEY", StringType),
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType)))
val df_raw1 = spark
.read
// .option("header", "true")
.schema(testschema)
.csv("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/test_pooling.csv")
// fill all na values with 0
val df1 = df_raw1.na.fill(0)
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("prediction")
var rmse = evaluator.evaluate(cvModel.transform(df1))
import org.apache.spark.sql.functions._
import spark.implicits._
val extracted = cvModel.transform(df1)
val prediction = extracted.select("prediction").map(r => r(0).asInstanceOf[Double]).collect()
if (prediction != null && prediction.length > 0) {
val avg = prediction.sum / prediction.length
val pw: FileWriter = new FileWriter("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/result.csv");
pw.append(avg.toString)
pw.flush()
pw.close()
println("completed modelling process")
} else {
//do nothing
}
}
}
}
gives me following error
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
- object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383@65af23c0)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))) AS features_intermediate#1839)
- element of array (index: 0)
Upgrading to Scala 2.12.8 solved the issue. Not sure about the rootcause though.