
KSQLDB: write custom UDAF with struct type (annotationparser exception)

Im trying to write a custom UDAF for KSQLDB.

First i wanted to try out the example:

The Schemas of the structs will be submited via the Schemas_describtor. "This communicates the underlying types to ksqlDB in a way that its type system can understand." F.E

 @UdafFactory(description = "describtonOfFunction",
                   paramSchema = PARAM_SCHEMA_DESCRIPTOR,
                   aggregateSchema = AGGREGATE_SCHEMA_DESCRIPTOR,
                   returnSchema = RETURN_SCHEMA_DESCRIPTOR)`

When i build the shadowJar and submit it to ksqldbs extension_dir without the Schemas(param,aggregate and return), KSQLDB is starting up with no errors and the function is showing up. I need to use structs, so i need to submit a schema to ksqldb. When i add the SchemaDescriptor to the UDAF Factory, i get a null pointer exception from Java and an annotationParser exception. SchemaBuilder and descriptor, gradle and logs below:

    public static final Schema PARAM_SCHEMA = SchemaBuilder.struct().optional()
    .field("C", Schema.OPTIONAL_INT64_SCHEMA)

public static final String PARAM_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "C BIGINT" +

public static final Schema AGGREGATE_SCHEMA = SchemaBuilder.struct().optional()
    .field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
    .field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
    .field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)

public static final String AGGREGATE_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "MIN BIGINT," +
    "MAX BIGINT," +

public static final Schema RETURN_SCHEMA = SchemaBuilder.struct().optional()
    .field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
    .field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
    .field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)

public static final String RETURN_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "MIN BIGINT," +
    "MAX BIGINT," +

(I just copied the class from the example)

Error Log of KSQLDB:

    at java.base/sun.reflect.annotation.AnnotationParser.parseArray(
   at java.base/sun.reflect.annotation.AnnotationParser.parseMemberValue(
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotation2(
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations2(
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations(
    at java.base/java.lang.reflect.Executable.declaredAnnotations(
     at java.base/java.lang.reflect.Executable.declaredAnnotations(
    at java.base/java.lang.reflect.Executable.getAnnotation(
     at java.base/java.lang.reflect.Method.getAnnotation(
     at io.confluent.ksql.function.UdafLoader.loadUdafFromClass(
    at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(
    at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(
     at java.base/$ForEachOp$OfRef.accept(
    at java.base/$3$1.accept(
     at java.base/$3$1.accept(
    at java.base/$2$1.accept(
    at java.base/java.util.Iterator.forEachRemaining(
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(
     at java.base/
     at java.base/
     at java.base/$ForEachOp.evaluateSequential(
    at java.base/$ForEachOp$OfRef.evaluateSequential(
    at java.base/
     at java.base/
     at io.confluent.ksql.function.UserFunctionLoader.load(

My Gradle Build file has the following dependecies:

dependencies {
implementation "io.confluent.ksql:ksqldb-udf:7.3.0"
implementation "org.apache.kafka:kafka_2.13:2.5.0"
implementation "org.apache.kafka:connect-api:3.3.1"}

I really dont know why KSQLDB cant parse the Schema (Its a provided example class from KSQLDB)...


  • I am able to submit the UDAF with param,aggregate and return Schema without an annotationParser Exception after downgrading from:

        implementation "io.confluent.ksql:ksqldb-udf:7.3.0"


        implementation "io.confluent.ksql:ksqldb-udf:5.5.1"