apache-sparkapache-spark-sqlavromaprspark-avro

Convert org.apache.avro.generic.GenericRecord to org.apache.spark.sql.Row


I have list of org.apache.avro.generic.GenericRecord, avro schemausing this we need to create dataframe with the help of SQLContext API, to create dataframe it needs RDD of org.apache.spark.sql.Row and avro schema. Pre-requisite to create DF is we should have RDD of org.apache.spark.sql.Row and it can be achieved using below code but some how it is not working and giving error, sample code.

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

But it is throwing an error at creation of DataFrame. Can someone please help me what is wrong in above code. Apart from this if someone has different logic for converting and creation of dataframe.

Whenever I will invoke any action on Dataframe, it will execute DAG and try to create DF object but in this it is failing with below exception as

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

After this I am trying to give correct version jar in jar parameter of spark submit and with other parameter as --conf spark.driver.userClassPathFirst=true but now it is failing with MapR as

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

We are using MapR distribution and after class path change in spark-submit, it is failing with above exception.

Can someone please help here or my basic need it to convert Avro GenericRecord into Spark Row so i can create Dataframe with it, please help
Thanks.


Solution

  • While creating dataframe from RDD[GenericRecord] there are few steps

    1. First need to convert org.apache.avro.generic.GenericRecord into org.apache.spark.sql.Row

    Use com.databricks.spark.avro.SchemaConverters.createConverterToSQL( sourceAvroSchema: Schema,targetSqlType: DataType)

    this is private method in spark-avro 3.2 version. If we are having same or less than 3.2 then copy this method into your own util class and use it else directly use it.

    1. Create Dataframe from collection of Row (rowSeq).

    val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val dataframe = sparkSession.createDataFrame(rowRDD, schemaType)

    This resolves my problem.