Im trying to write a custom UDAF for KSQLDB.
First i wanted to try out the example: https://docs.ksqldb.io/en/latest/how-to-guides/create-a-user-defined-function/#implement-the-class
The Schemas of the structs will be submited via the Schemas_describtor. "This communicates the underlying types to ksqlDB in a way that its type system can understand." F.E
@UdafFactory(description = "describtonOfFunction",
paramSchema = PARAM_SCHEMA_DESCRIPTOR,
aggregateSchema = AGGREGATE_SCHEMA_DESCRIPTOR,
returnSchema = RETURN_SCHEMA_DESCRIPTOR)`
When i build the shadowJar and submit it to ksqldbs extension_dir without the Schemas(param,aggregate and return), KSQLDB is starting up with no errors and the function is showing up. I need to use structs, so i need to submit a schema to ksqldb. When i add the SchemaDescriptor to the UDAF Factory, i get a null pointer exception from Java and an annotationParser exception. SchemaBuilder and descriptor, gradle and logs below:
public static final Schema PARAM_SCHEMA = SchemaBuilder.struct().optional()
.field("C", Schema.OPTIONAL_INT64_SCHEMA)
.build();
public static final String PARAM_SCHEMA_DESCRIPTOR = "STRUCT<" +
"C BIGINT" +
">";
public static final Schema AGGREGATE_SCHEMA = SchemaBuilder.struct().optional()
.field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
.field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
.field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)
.build();
public static final String AGGREGATE_SCHEMA_DESCRIPTOR = "STRUCT<" +
"MIN BIGINT," +
"MAX BIGINT," +
"COUNT BIGINT" +
">";
public static final Schema RETURN_SCHEMA = SchemaBuilder.struct().optional()
.field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
.field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
.field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)
.field("DIFFERENTIAL", Schema.OPTIONAL_INT64_SCHEMA)
.build();
public static final String RETURN_SCHEMA_DESCRIPTOR = "STRUCT<" +
"MIN BIGINT," +
"MAX BIGINT," +
"COUNT BIGINT," +
"DIFFERENTIAL BIGINT" +
">";
(I just copied the class from the example)
Error Log of KSQLDB:
`java.lang.NullPointerException
at java.base/sun.reflect.annotation.AnnotationParser.parseArray(AnnotationParser.java:533)
at java.base/sun.reflect.annotation.AnnotationParser.parseMemberValue(AnnotationParser.java:356)
at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotation2(AnnotationParser.java:287)
at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations2(AnnotationParser.java:121)
at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations(AnnotationParser.java:73)
at java.base/java.lang.reflect.Executable.declaredAnnotations(Executable.java:604)
at java.base/java.lang.reflect.Executable.declaredAnnotations(Executable.java:602)
at java.base/java.lang.reflect.Executable.getAnnotation(Executable.java:572)
at java.base/java.lang.reflect.Method.getAnnotation(Method.java:695)
at io.confluent.ksql.function.UdafLoader.loadUdafFromClass(UdafLoader.java:59)
at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(UserFunctionLoader.java:124)
at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(UserFunctionLoader.java:97)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at io.confluent.ksql.function.UserFunctionLoader.load(UserFunctionLoader.java:96)
at io.confluent.ksql.rest.server.KsqlServerMain.loadFunctions(KsqlServerMain.java:133)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:80)`
My Gradle Build file has the following dependecies:
dependencies {
implementation "io.confluent.ksql:ksqldb-udf:7.3.0"
implementation "org.apache.kafka:kafka_2.13:2.5.0"
implementation "org.apache.kafka:connect-api:3.3.1"}
I really dont know why KSQLDB cant parse the Schema (Its a provided example class from KSQLDB)...
I am able to submit the UDAF with param,aggregate and return Schema without an annotationParser Exception after downgrading from:
implementation "io.confluent.ksql:ksqldb-udf:7.3.0"
to:
implementation "io.confluent.ksql:ksqldb-udf:5.5.1"