I try to call groupByKey on a Dataset using below code:
SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());
@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
private Integer id;
private String letters;
}
class JavaAggregator extends Aggregator<Chunk, String, String> {
@Override
public String zero() {
return "";
}
@Override
public String reduce(String b, Chunk a) {
return b + a.getLetters();
}
@Override
public String merge(String b1, String b2) {
return b1 + b2;
}
@Override
public String finish(String reduction) {
return reduction;
}
@Override
public Encoder<String> bufferEncoder() {
return Encoders.bean(String.class);
}
@Override
public Encoder<String> outputEncoder() {
return Encoders.bean(String.class);
}
}
List<Chunk> chunkList = List.of(
new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));
Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));
KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>) v -> v.getId(), Encoders.bean(Integer.class));
But I get Exception which says java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
I am not an expert in scala internals and it is hard for me to say what is wrong with the code as the exception is thrown by some compiled code and assertion message "assertion failed" is not very helpful. But maybe there is something fundamentally wrong what I have done in my code that causes this exception?
I found out that the problem was with usage of Encoders. I should have used Encoders.INT() instead of Encoders.bean(Integer.class)