scalaapache-sparkserializationhiveudf

Create SparkSQL UDF with non serializable objects


I'm trying to write an UDF that I would like to use on Hive tables in an sqlContext. Is it in any way possible to include objects from other libraries that are not serializable? Here's a minimal example of what does not work:

def myUDF(s: String) = {
 import sun.misc.BASE64Encoder
 val coder= new BASE64Encoder
 val encoded= decoder.encode(s)
 encoded
}

I register the function in the spark shell as udf function

val encoding = sqlContext.udf.register("encoder", myUDF)

If I try to run it on a table "test"

sqlContext.sql("SELECT encoder(colname) from test").show()

I get the error

org.apache.spark.SparkException: Task not serializable
object not serializable (class: sun.misc.BASE64Encoder, value: sun.misc.BASE64Encoder@4a7f9a94)

Is there a workaround for this? I tried embedding myUDF in an object and in a class but that didn't work either.


Solution

  • You can try defining udf function as

    def encoder = udf((s: String) => {
      import sun.misc.BASE64Encoder
      val coder= new BASE64Encoder
      val encoded= coder.encode(s.getBytes("UTF-8"))
      encoded
    })
    

    And call the udf function as

    dataframe.withColumn("encoded", encoder(col("id"))).show
    

    Updated

    As @santon has pointed out that BASE64Encoder encoder is initiated for each row in the dataframe which might lead to performance issues. The solution to that would be to create a static object of BASE64Encoder and call it within udf function.