confluent-schema-registryflink-statefun

Flink statefun and confluent schema registry compatibility


I'm trying to egress to confluent kafka from flink statefun. In confluent git repo in order to schema check and put data to kafka topic all we need to do is use kafka client ProducerRecord object with avro object.

But in statefun we need to override "ProducerRecord<byte[], byte[]> serialize" method for kafka egress. This causes the following error.

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"

Schema registery and statefun kafka egress seem to be incompatible. Are there any workaround ?


Solution

  • It is possible to use Confluent Schema Registry with Statefun Egress.

    In order to do so, you first register your schema manually with the schema registry and then supply KafkaEgressSerializer a byte[] serialized by KafkaAvroSerializer instance.

    Code below is the gist of it and is in compliance with the first one in Igal's workaround suggestions:

    public class SpecificRecordFromAvroSchemaSerializer implements KafkaEgressSerializer<SpecificRecordGeneratedFromAvroSchema> {
    
        private static String KAFKA_TOPIC = "kafka_topic";
    
        private static CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
            "http://schema-registry:8081",
            1_000
        );
        private static KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(schemaRegistryClient);
    
        static {
            try {
                schemaRegistryClient.register(
                    KAFKA_TOPIC + "-value", // assuming subject name strategy is TopicNameStrategy (default)
                    SpecificRecordGeneratedFromAvroSchema.getClassSchema()
                );
            } catch (IOException e) {
                e.printStackTrace();
            } catch (RestClientException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public ProducerRecord<byte[], byte[]> serialize(SpecificRecordGeneratedFromAvroSchema specificRecordGeneratedFromAvroSchema) {
            byte[] valueData = kafkaAvroSerializer.serialize(
                KAFKA_TOPIC,
                specificRecordGeneratedFromAvroSchema
            );
    
            return new ProducerRecord<>(
                KAFKA_TOPIC,
                String.valueOf(System.currentTimeMillis()).getBytes(),
                valueData
            );
        }
    
    }