I wrote a custom Aggregator
(an extension of org.apache.spark.sql.expressions.Aggregator
) and Spark invokes it correctly as an aggregating function under group by
statement:
sparkSession
.createDataFrame(...)
.groupBy(col("id"))
.agg(
new MyCustomAggregator().toColumn().name("aggregation_result"))
.show();
I would like to use it within window function though, because ordering matters to me. I've tried invoking it like that:
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
That's the error I get:
org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.
Is it at all possible to use custom Aggregators as window functions in Spark 3.0.1? If so, what am I missing here?
Yes, Spark 3 does indeed support custom aggregators as window functions.
Here is the Java code:
UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));
sparkSession
.createDataFrame(...)
.withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
.partitionBy(col("id"))
.orderBy(col("order"))))
.show();
AggregationInput
here is a simple DTO with the row elements needed for your aggregation function.
So no matter whether you aggregate under group by
or as a window function you still want to use org.apache.spark.sql.expressions.Aggregator
.