javaapache-sparkapache-spark-sqlencoder

Apache Spark + Java: "java.lang.AssertionError: assertion failed" in ExpressionEncoder


I try to call groupByKey on a Dataset using below code:

    SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
    JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    class Chunk implements Serializable {
        private Integer id;
        private String letters;
    }

    class JavaAggregator extends Aggregator<Chunk, String, String> {

        @Override
        public String zero() {
            return "";
        }

        @Override
        public String reduce(String b, Chunk a) {
            return b + a.getLetters();
        }

        @Override
        public String merge(String b1, String b2) {
            return b1 + b2;
        }

        @Override
        public String finish(String reduction) {
            return reduction;
        }

        @Override
        public Encoder<String> bufferEncoder() {
            return Encoders.bean(String.class);
        }

        @Override
        public Encoder<String> outputEncoder() {
            return Encoders.bean(String.class);
        }
    }

    List<Chunk> chunkList = List.of(
            new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
            new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
            new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));

    Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
    Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));

    KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>)  v -> v.getId(), Encoders.bean(Integer.class));

But I get Exception which says java.lang.AssertionError: assertion failed

at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)

I am not an expert in scala internals and it is hard for me to say what is wrong with the code as the exception is thrown by some compiled code and assertion message "assertion failed" is not very helpful. But maybe there is something fundamentally wrong what I have done in my code that causes this exception?


Solution

  • I found out that the problem was with usage of Encoders. I should have used Encoders.INT() instead of Encoders.bean(Integer.class)