javaapache-sparkamazon-s3avrospark-avro

org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 0.0000


I am trying to read data stored in a hive table in s3, covert it to Avro format and then consume the Avro records to build the final object and push it to a kafka topic. In the object I am trying to publish, I have a nested object that has fields with string and decimal types (CarCostDetails). When this object is null, I am able to push records to kafka, but if this object is populated with any value (0, +/-) then I get this exception org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 40000.0000 when I do producer.send()

I am not defining the schema in my project. I am using a predefined schema as an external dependency in my project

Example: CarDataLoad.scala

class CarDataLoad extends ApplicationRunner with Serializable {
override def run(args: ApplicationArguments): Unit = {
            val spark = new SparkSession.Builder()
                .appName("s3-to-kafka")
                .enableHiveSupport
                .getOrCreate()
      getData(spark)
}
    

 def getData(sparkSession: SparkSession){
        val avroPath = copyToAvro(sparkSession)
        val car = sparkSession.read.avro(avroPath)
            import sparkSession.implicits._
            val avroData = car.select(
              $"car_specs",
              $"car_cost_details",
              $"car_key"
            )
           
       ingestDataframeToKafka(sparkSession, avroData)
    
    }
    
    
    def copyToAvro(sparkSession: SparkSession): String = {
                sourceDf = sparkSession.read.table("sample_table")
          val targetPath = s"s3://some/target/path"
         //write to a path (internal libraries to do that) in avro format
    
          targetPath
    }
    
        def ingestDataframeToKafka(sparkSession: SparkSession, dataframe: sql.DataFrame): Unit ={
            val batchProducer: CarProducerClass = new CarProducerClass(kafkaBootstapServers, kafkaSchemaRegistryUrl,
                kafkaClientIdConfig, topic)
            dataframe.collect.foreach(
                row => {
                    val result = batchProducer.publishRecord(row)
                }
            )
            batchProducer.closeProducer();
        }
}

Producer class - CarProducerClass.java

import org.apache.kafka.clients.producer.*;
import org.apache.spark.sql.Row;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.*;

public class CarProducerClass {

private void initializeProducer() {
        log.info("Initializing producer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstapServers);
        props.put("schema.registry.url", kafkaSchemaRegistryUrl);
        props.put("acks", "1");
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("retries",3);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaClientIdConfig);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("key.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicNameStrategy");
        props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
        log.info("Created producer");
        producer = new KafkaProducer(props);
    }
}

public Boolean publishRecord(Row row) {
    Boolean publishRecordFlag = false;
        if (producer == null) {
            initializeProducer();
        }
    Car.Builder car = new Car.newBuilder();
    car.setCarSpecs(buildCarSpecs(row.getAs("car_specs")))
    car.setCarCostDetails(buildCarCostDetails(row.getAs("car_cost_details")))
    CarKey.Builder carKey = new CarKey.Builder();
    Row car_key = row.getAs("car_key");
    carKey.setKey(car_key.getAs("car_id"))
    
        try{
             ProducerRecord<CarKey, Car> producerRecord
                    = new ProducerRecord(topic, null, System.currentTimeMillis(), carKey.build(), car.build());
                //Exception occurs here 
                RecordMetadata metadata = (RecordMetadata) producer.send(producerRecord).get();
       
          } catch (Exception e){
            log.info("Exception caught");
            e.printStackTrace();
          }
     
    public CarSpecs buildCarSpecs (Row car_specs){
        CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
         kafkaCarSpecs.setCarName("CX5");
         kafkaCarSpecs.setCarBrand("Mazda"); 
    }

    public CostDetails buildCarCostDetails (Row car_cost_details){
        CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
        kafkaCarSpecs.setPurchaseCity(car_cost_details.getAs("purchase_city"));
        kafkaCarSpecs.setPurchaseState(car_cost_details.getAs("purchase_state"));
        kafkaCarSpecs.setBasePrice((BigDecimal)car_cost_details.getAs("base_price"));
        kafkaCarSpecs.setTax((BigDecimal)car_cost_details.getAs("tax")); 
        kafkaCarSpecs.setTotalCost((BigDecimal)car_cost_details.getAs("total_cost")); 
        kafkaCarSpecs.setOtherCosts((BigDecimal)car_cost_details.getAs("other_costs")); 
    }
    public void closeProducer(){
        producer.close();
    }}

Avro Schema (predefined in another project that is productionalized)

CarSpecs.avdl

protocol CarSpecsProtocol {

  record CarSpecs {
    string name;
    string brand;

  }
}

CarCostDetails.avdl

protocol CarCostDetailsProtocol {

  record CarCostDetails {
    string purchase_city;
    string purchase_state;
    decimal(18, 4) base_price;
    union { decimal(18,4), null} tax;
    union { decimal(18,4), null} total_cost;
    union { decimal(18,4), null} other_costs;
  }
}

Car.avdl

protocol CarProtocol {
  import idl "CarCostDetails.avdl";
  import idl "CarSpecs.avdl";
  record Car {
    union { null, CarSpecs} car_specs = null;
    union { null, CarCostDetails} car_cost_details = null;
  }
}

CarKey.avdl

protocol CarKeyProtocol {

  record CarKey {
     string id;
  }
}

Avro generated Java Objects

@AvroGenerated
public class CarSpecs extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String name;
private String brand;
}


@AvroGenerated
import java.math.BigDecimal;
public class CarCostDetails extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String purchaseCity;
private String purchaseState;
private BigDecimal basePrice;
private BigDecimal tax;
private BigDecimal totalCost;
private BigDecimal otherCosts;

}


@AvroGenerated
public class Car extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private CarSpecs carSpecs;
private CarCostDetails carCostDetails;

}

@AvroGenerated
public class CarKey extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc 
private String id;
}

What I have already tried:

  1. Passing the spark-avro package in spark command like so --packages org.apache.spark:spark-avro_2.11:2.4.3
  2. Ordering the fields like they are in the actual schema
  3. Setting a default value of 0 for all decimal/BigDecimal fields
  4. Checking if the source's datatype for these fields is java.Math.BigDecimal. It is.
  5. Explicitly casting the value to BigDecimal (like in example above)

All the above still result in org.apache.avro.UnresolvedUnionException


Solution

  • Add decimal conversion to global configuration (do it once during runtime before sending any messages to Kafka, e.g., in initializeProducer):

    import org.apache.avro.specific.SpecificData;
    import org.apache.avro.Conversions;
    
    SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
    

    You might seen similar line in static constructor generated from Avro schema applied to MODEL$, so remember to add all conversions used in your messages.

    Following observations are based on avro 1.10.1 library source and runtime behavior.

    MODEL$ configuration should be applied (see SpecificData.getForClass), but might not be if SpecificData and your message class are loaded by different class loaders (that was case in my application – two separate OSGI bundles). In this case getForClass falls back to global instance.

    Then GenericData.resolveUnion throws UnresolvedUnionException because conversionsByClass does not contain value with BigDecimal.class key and getSchemaName overridden in SpecificData returns Schema.Type.STRING for BigDecimal class (and few others, see SpecificData.stringableClasses). This STRING is then matched to values defined in union schema (getIndexNamed) and not found (because it is not "bytes" or "null").