apache-sparkavroapache-beamserializableencoder

How to select a single encoder for all subclass of Avro SpecificRecordBase in Apache Beam?


Background

My Beam Pipeline is designed to process elements of Avro SpecificRecordBase type.

To simplify my problem, let's say I have two two kind of elements generated in Avro format, they all have their own fields:

class Dog extends SpecificRecordBase {
    ....
}

class Cat extends SpecificRecordBase {
   ...
}

The Pipeline will reads the element from input Kafka, process elements and put the processed elements in output Kafka, like below:

Pipeline pipeline = Pipeline.create(getOptions());
pipeline.getCoderRegistry().registerCoderForClass(SpecificRecordBase.class, <what shall I put here?>);

pipeline.apply(kafkaReaderTransformer)
              .apply(Window.into(FixedWindows.of(Duration.standardSeconds(getWindowSize()))))
                .apply(GroupByKey.create())
                .apply(ParDo.of(GiveShowerToPetDoFn))
                .apply(Flatten.iterables())
                .apply(kafkaWriterTransformer);

Question

My question is how do I register the encoder in my pipeline? Since the pipeline can read from Cat Kafka or Dog Kafka and maybe Toad Kafka in the future, I need a generic way to register the encoder which can serialize all the subclass of SpecificRecordBase which is decided at runtime.

My failed attempts

I have try the following to fill in the blank of <what shall I put here?> in the code:

  1. AvroCoder.of(SpecificRecordBase.class): Not working

    I got error below when running the pipeline:

    Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
     at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
     at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
     at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
     at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
     at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
     ... 23 more
    

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) ... 27 more

2. SerializableCoder.of(SpecificRecordBase.class): Confusing exception throwed

This should be an promising option, but I got very confusing error below when I run the Pipeline, the below is confusing because Cat actually implements serializable by inheritance from SpecificRecordBase:

Caused by: java.lang.ClassCastException: Cat cannot be cast to java.io.Serializable at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) at org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:53) at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$toByteFunction$28e77fe8$1(CoderHelpers.java:143) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1043) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1043) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

3. Do not register encoder myself, let Beam infer.
   The system will infer the suitable Encoder for me. This do nothing solution works in my local standalone machine, but as I put them into real multi-server env, they just throw exceptions indicating they can not infer an encoder. 

Caused by: java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Deserialize)/ParMultiDo(Deserialize).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, org.apache.avro.specific.SpecificRecordBase>: Unable to provide a Coder for org.apache.avro.specific.SpecificRecordBase. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called. at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507) at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278) at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115) at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:191) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:538) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)


Solution

  • I finally resolve the issue posted here using a workaround.

    Root cause

    It turned out to be caused by an uncompatiable encoder in different env. While the encoder works in my local env, the prod dependency version is different making the Beam library unable to encode and decode the class derived from SpecificRecordBase.

    Two solutions

    1) Change every doFun in your pipeline using bytes as input and bytes as output:

    public class GiveShowerToPetDoFn extends DoFn<KV<String, byte[]>, KV<String, byte[]>> {
       ...
    }
    

    This means that you will deserialize the object from bytes mannually before you execute the actual business logic and serialize your result back to bytes as the last step. This makes Beam use the default bytes encoder/decoder between apllied doFun and the encoder/decoder will always works because it is handling basic type rather than self-defined type.

    2) Write your own encoder/decoder for your custom type.

    Solution 1 and 2 are the same in essence. In my case, I use the first solution to get around my problem.