I'm trying to write an UDF that I would like to use on Hive tables in an sqlContext. Is it in any way possible to include objects from other libraries that are not serializable? Here's a minimal example of what does not work:
def myUDF(s: String) = {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= decoder.encode(s)
encoded
}
I register the function in the spark shell as udf function
val encoding = sqlContext.udf.register("encoder", myUDF)
If I try to run it on a table "test"
sqlContext.sql("SELECT encoder(colname) from test").show()
I get the error
org.apache.spark.SparkException: Task not serializable
object not serializable (class: sun.misc.BASE64Encoder, value: sun.misc.BASE64Encoder@4a7f9a94)
Is there a workaround for this? I tried embedding myUDF in an object and in a class but that didn't work either.
You can try defining udf
function as
def encoder = udf((s: String) => {
import sun.misc.BASE64Encoder
val coder= new BASE64Encoder
val encoded= coder.encode(s.getBytes("UTF-8"))
encoded
})
And call the udf
function as
dataframe.withColumn("encoded", encoder(col("id"))).show
Updated
As @santon has pointed out that BASE64Encoder
encoder is initiated for each row in the dataframe which might lead to performance issues. The solution to that would be to create a static object of BASE64Encoder
and call it within udf
function.