scalaapacheschemaavrobytearrayoutputstream

How to set array of records Using GenericRecordBuilder


I'm trying to turn a Scala object (i.e case class) into byte array.

In order to do so, I'm inserting the object content into a GenericRecordBuilder using its specific schema, and eventually using GenericDatumWriter i turn it into a byte array.

I have no problem to set primitive types, and array of primitive types into the GenericRecordBuilder.

But, I need help with Inserting array of records into the GenericRecordBuilder, and create a byte array from it.

What is the right way to insert array of records into the GenericRecordBuilder?

Here is part of what i'm trying to do:

This is the Schema:

{
    "type": "record",
    "name": "test1",
    "namespace": "ns",
    "fields": [
      {
        "name": "t_name",
        "type": "string",
        "default": "a"
      },
      {
        "name": "t_num",
        "type": "int",
        "default": 0
      },
      {"name" : "t_arr", "type":
        ["null",
         {"type": "array", "items": {
              "name": "t_arr_a",
              "type": "record",
              "fields": [
                {
                  "name": "t_arr_f1",
                  "type": "int",
                  "default": 0
                },
                {
                  "name": "t_arr_f2",
                  "type": "int",
                  "default": 0
                }
              ]
            }
            }
         ]
       }
    ]
}

This is the Scala class that populate the GenericRecordBuilder and transform it into byte Array:

package utils

import java.io.ByteArrayOutputStream

import org.apache.avro.{Schema, generic}
import org.apache.avro.generic.{GenericData, GenericDatumWriter}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.generic.GenericRecordBuilder

object CheckRecBuilder extends App {

  val avroSchema: Schema = new Schema.Parser().parse(this.getClass.getResourceAsStream("/data/myschema.avsc"))
  val recordBuilder = new GenericRecordBuilder(avroSchema)

  recordBuilder.set("t_name", "X")
  recordBuilder.set("t_num", 100)


  recordBuilder.set("t_arr", ???)

  val record = recordBuilder.build()


  val w = new GenericDatumWriter[GenericData.Record](avroSchema)
  val outputStream = new ByteArrayOutputStream()
  val e = EncoderFactory.get.binaryEncoder(outputStream, null)
  w.write(record, e)
  val barr =  outputStream.toByteArray

  println("End")

}

Solution

  • I manged to set the array of objects.

    I wonder if there is a better or righter way for doing it.

    Here is what I did:

    1. Created a case class:

    case class t_arr_a(t_arr_f1:Int, t_arr_f2:Int)

    1. Created a method that transform case class into a GenericData.Record:

    def caseClassToGenericDataRecord(cc:Product, schema:Schema): GenericData.Record = { val childRecord = new GenericData.Record(schema.getElementType)
    val values = cc.productIterator cc.getClass.getDeclaredFields.map(f => childRecord.put(f.getName, values.next )) childRecord }

    1. Updated the class CheckRecBuilder above:

    replaced:

    recordBuilder.set("t_arr", ???)
    

    With:

      val childSchema = new GenericData.Record(avroSchema2).getSchema.getField("t_arr").schema().getTypes().get(1)
      val tArray = Array(t_arr_a(2,4), t_arr_a(25,14))
      val tArrayGRecords: util.List[GenericData.Record] 
        = Some(yy.map(x => caseClassToGenericDataRecord(x,childSchema))).map(arr => java.util.Arrays.asList(arr: _*)).orNull
    
      recordBuilder.set("t_arr", tArrayGRecords)