javaapache-sparkhdfsavrogobblin

How do I use Java to read AVRO data in Spark 1.3.1?


I am trying to develop a Java Spark Application that reads AVRO records (https://avro.apache.org/) from HDFS put there by a technology called Gobblin (https://github.com/linkedin/gobblin/wiki).

A sample HDFS AVRO data file:

/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro

Unfortunately, I am finding that there are limited examples written in Java.

The best thing I have found is written in Scala ( Using Hadoop version 1 libraries).

Any help would be appreciated.

Currently I am thinking of using the below code, though I am unsure on how to extract a HashMap of values from my AVRO data:

JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
    path, 
    AvroKeyInputFormat.class, 
    AvroKey.class, 
    NullWritable.class, 
    new Configuration() );

// JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
//    path, 
//    AvroKeyValueInputFormat.class, 
//    AvroKey.class, 
//    AvroValue.class, 
//    new Configuration() );

My current Maven dependencies:

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.7.6</version>
        <classifier>hadoop2</classifier>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.4.3</version>
    </dependency>


    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

Solution

  • I wrote a small prototype that was able to read as input my sample Gobblin Avro records, and using Spark, output the relevant results ( spark-hdfs-avro-test ). It is worth mentioning the that there were a couple of issues I needed to address. Any comments or feedback would be greatly appreciated.

    Issue 1: There are issues with the current Avro release (1.7.7) and Java Serialization:

    To quote:

    Spark relies on Java's Serializable interface to serialize objects. Avro objects don't implement Serializable. So, to work with Avro objects in Spark, you need to subclass your Avro generated classes and implement Serializable, e.g. https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java.

    To address this I wrote my own Serializable wrapper classes:

    Issue 2: My Avro Messages don't contain a "Key" value.

    Unfortunately, I was unable to use any out-of-the-box input formats and had to write my own: AvroValueInputFormat

    public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> {
    

    I was unable to use the following:

    # org.apache.avro.mapreduce.AvroKeyInputFormat
    public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
    
    # org.apache.avro.mapreduce.AvroKeyValueInputFormat
    public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
    

    Issue 3: I was unable to use the AvroJob class setters to set schema values and I had to do this manually.

        hadoopConf.set( "avro.schema.input.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
        hadoopConf.set( "avro.schema.input.value", Event.SCHEMA$.toString() ); //$NON-NLS-1$
        hadoopConf.set( "avro.schema.output.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
        hadoopConf.set( "avro.schema.output.value", SeverityEventCount.SCHEMA$.toString() ); //$NON-NLS-1$