I have list of org.apache.avro.generic.GenericRecord
, avro schema
using this we need to create dataframe
with the help of SQLContext
API, to create dataframe
it needs RDD
of org.apache.spark.sql.Row
and avro schema
. Pre-requisite to create DF is we should have RDD of org.apache.spark.sql.Row and it can be achieved using below code but some how it is not working and giving error, sample code.
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
But it is throwing an error at creation of DataFrame
. Can someone please help me what is wrong in above code. Apart from this if someone has different logic for converting and creation of dataframe
.
Whenever I will invoke any action on Dataframe, it will execute DAG and try to create DF object but in this it is failing with below exception as
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
After this I am trying to give correct version jar in jar parameter of spark submit and with other parameter as --conf spark.driver.userClassPathFirst=true but now it is failing with MapR as
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
We are using MapR distribution and after class path change in spark-submit, it is failing with above exception.
Can someone please help here or my basic need it to convert Avro GenericRecord into Spark Row so i can create Dataframe with it, please help
Thanks.
While creating dataframe from RDD[GenericRecord] there are few steps
Use com.databricks.spark.avro.SchemaConverters.createConverterToSQL( sourceAvroSchema: Schema,targetSqlType: DataType)
this is private method in spark-avro 3.2 version. If we are having same or less than 3.2 then copy this method into your own util class and use it else directly use it.
val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val dataframe = sparkSession.createDataFrame(rowRDD, schemaType)
This resolves my problem.