javaapache-flinkavroconfluent-schema-registry

flink ConfluentRegistryAvroSerializationSchema not respecting registryConfigs


When I use in Apache Flink the KafkaRecordSerializationSchema with settings for the schema registry serialization , the registryConfigs settings are not taken in account

settings like

auto.register.schemas or avro.remove.java.properties

package flink;

import example.avro.Car;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Map;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS;
import static io.confluent.kafka.serializers.KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG;

public class Jobflink {

  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000);
    env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.AT_LEAST_ONCE);

    Map<String, String> schema_settings = Map.of(
      AVRO_REMOVE_JAVA_PROPS_CONFIG, "true",
      AUTO_REGISTER_SCHEMAS, "false"
    );

    KafkaSink<Car> sink = KafkaSink.<Car>builder()
      .setBootstrapServers("kafka-local:29092")
      .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
          .setValueSerializationSchema(
            ConfluentRegistryAvroSerializationSchema.forSpecific(
              Car.class,
              "toto-value",
              "http://confluent-schema-registry-local:8081",
              schema_settings))
          .setTopic("toto")
          .build())
      .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .build();

    env.execute();
  }
}

I also tried to set the confluent schema registry settings with setKafkaProducerConfig but it also do not work

Properties properties = new Properties();
properties.setProperty(AVRO_REMOVE_JAVA_PROPS_CONFIG, "true");
properties.setProperty(AUTO_REGISTER_SCHEMAS, "false");

KafkaSink<Car> sink = KafkaSink.<Car>builder()
  .setBootstrapServers("kafka-local:29092")
  .setKafkaProducerConfig(properties)
  ...

Solution

  • feature is not yet available , only SSL settings are working for the moment in registryConfigs

    https://issues.apache.org/jira/browse/FLINK-33045

    this is a working patch -> SafeConfluentRegistryAvroSerializationSchema

    /*
     copy past of ConfluentRegistryAvroSerializationSchema
    
     but disable register of schema and call removeProperty(node, "avro.java.string");
     */
    package flink;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.node.ArrayNode;
    import com.fasterxml.jackson.databind.node.ObjectNode;
    import io.confluent.kafka.schemaregistry.avro.AvroSchema;
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.specific.SpecificRecord;
    import org.apache.flink.formats.avro.AvroSerializationSchema;
    import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
    import org.apache.flink.formats.avro.SchemaCoder;
    
    import javax.annotation.Nullable;
    import java.io.DataInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.nio.ByteBuffer;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Objects;
    
    import static java.lang.String.format;
    
    class ConfluentSchemaRegistryCoder implements SchemaCoder {
    
        private final SchemaRegistryClient schemaRegistryClient;
        private String subject;
        private static final int CONFLUENT_MAGIC_BYTE = 0;
    
        /**
         * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
         * schema registry.
         *
         * @param schemaRegistryClient client to connect schema registry
         * @param subject subject of schema registry to produce
         */
        public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) {
            this.schemaRegistryClient = schemaRegistryClient;
            this.subject = subject;
        }
    
        /**
         * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
         * schema registry.
         *
         * @param schemaRegistryClient client to connect schema registry
         */
        public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
            this.schemaRegistryClient = schemaRegistryClient;
        }
    
        @Override
        public Schema readSchema(InputStream in) throws IOException {
            DataInputStream dataInputStream = new DataInputStream(in);
    
            if (dataInputStream.readByte() != 0) {
                throw new IOException("Unknown data format. Magic number does not match");
            } else {
                int schemaId = dataInputStream.readInt();
    
                try {
                    return schemaRegistryClient.getById(schemaId);
                } catch (RestClientException e) {
                    throw new IOException(
                            format("Could not find schema with id %s in registry", schemaId), e);
                }
            }
        }
    
        private static void removeProperty(JsonNode node, String propertyName) {
            if (node.isObject()) {
                ObjectNode objectNode = (ObjectNode) node;
                objectNode.remove(propertyName);
                Iterator<JsonNode> elements = objectNode.elements();
                while (elements.hasNext()) {
                    removeProperty(elements.next(), propertyName);
                }
            } else if (node.isArray()) {
                ArrayNode arrayNode = (ArrayNode) node;
                Iterator<JsonNode> elements = arrayNode.elements();
                while (elements.hasNext()) {
                    removeProperty(elements.next(), propertyName);
                }
            }
        }
    
        @Override
        public void writeSchema(Schema schema, OutputStream out) throws IOException {
            try {
                ObjectMapper jsonMapper = JacksonMapper.INSTANCE;
                JsonNode node = jsonMapper.readTree(schema.toString());
                removeProperty(node, "avro.java.string");
                AvroSchema avroSchema = new AvroSchema(node.toString());
                schema = avroSchema.rawSchema();
    
                int registeredId = schemaRegistryClient.getId(subject, schema);
                out.write(CONFLUENT_MAGIC_BYTE);
                byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
                out.write(schemaIdBytes);
            } catch (RestClientException e) {
                throw new IOException("Could not register schema in registry", e);
            }
        }
    }
    
    
    class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
    
        private static final long serialVersionUID = 8610401613495438381L;
        private final String subject;
        private final String url;
        private final int identityMapCapacity;
        private final @Nullable Map<String, ?> registryConfigs;
    
        CachedSchemaCoderProvider(String url, int identityMapCapacity) {
            this(null, url, identityMapCapacity, null);
        }
    
        CachedSchemaCoderProvider(
                @Nullable String subject,
                String url,
                int identityMapCapacity,
                @Nullable Map<String, ?> registryConfigs) {
            this.subject = subject;
            this.url = Objects.requireNonNull(url);
            this.identityMapCapacity = identityMapCapacity;
            this.registryConfigs = registryConfigs;
        }
    
        @Override
        public SchemaCoder get() {
            return new ConfluentSchemaRegistryCoder(
                    this.subject,
                    new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs));
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            CachedSchemaCoderProvider that = (CachedSchemaCoderProvider) o;
            return identityMapCapacity == that.identityMapCapacity
                    && Objects.equals(subject, that.subject)
                    && url.equals(that.url)
                    && Objects.equals(registryConfigs, that.registryConfigs);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(subject, url, identityMapCapacity, registryConfigs);
        }
    }
    
    
    /**
     * Serialization schema that serializes to Avro binary format that uses Confluent Schema Registry.
     *
     * @param <T> the type to be serialized
     */
    public class SafeConfluentRegistryAvroSerializationSchema<T>
            extends RegistryAvroSerializationSchema<T> {
    
        private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    
        private static final long serialVersionUID = -1771641202177852775L;
    
        /**
         * Creates a Avro serialization schema.
         *
         * @param recordClazz class to serialize. Should be either {@link SpecificRecord} or {@link
         *     GenericRecord}.
         * @param schema writer's Avro schema. Should be provided if recordClazz is {@link
         *     GenericRecord}
         * @param schemaCoderProvider provider for schema coder that writes the writer schema to
         *     Confluent Schema Registry
         */
        private SafeConfluentRegistryAvroSerializationSchema(
                Class<T> recordClazz,
                Schema schema,
                SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
            super(recordClazz, schema, schemaCoderProvider);
        }
    
        /**
         * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
         * Avro schema and writes the writer schema to Confluent Schema Registry.
         *
         * @param tClass the type to be serialized
         * @param subject subject of schema registry to produce
         * @param schemaRegistryUrl URL of schema registry to connect
         * @return serialized record
         */
        public static <T extends SpecificRecord>
        SafeConfluentRegistryAvroSerializationSchema<T> forSpecific(
                Class<T> tClass, String subject, String schemaRegistryUrl) {
            return forSpecific(tClass, subject, schemaRegistryUrl, null);
        }
    
        /**
         * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
         * Avro schema and writes the writer schema to Confluent Schema Registry.
         *
         * @param tClass the type to be serialized
         * @param subject subject of schema registry to produce
         * @param schemaRegistryUrl URL of schema registry to connect
         * @param registryConfigs map with additional schema registry configs (for example SSL
         *     properties)
         * @return serialized record
         */
        public static <T extends SpecificRecord>
        SafeConfluentRegistryAvroSerializationSchema<T> forSpecific(
                Class<T> tClass,
                String subject,
                String schemaRegistryUrl,
                @Nullable Map<String, ?> registryConfigs) {
            return new SafeConfluentRegistryAvroSerializationSchema<>(
                    tClass,
                    null,
                    new CachedSchemaCoderProvider(
                            subject,
                            schemaRegistryUrl,
                            DEFAULT_IDENTITY_MAP_CAPACITY,
                            registryConfigs));
        }
    
        /**
         * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
         * Avro schema and writes the writer schema to Confluent Schema Registry.
         *
         * @param subject subject of schema registry to produce
         * @param schema schema that will be used for serialization
         * @param schemaRegistryUrl URL of schema registry to connect
         * @return serialized record
         */
        public static SafeConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
                String subject, Schema schema, String schemaRegistryUrl) {
            return forGeneric(subject, schema, schemaRegistryUrl, null);
        }
    
        /**
         * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
         * Avro schema and writes the writer schema to Confluent Schema Registry.
         *
         * @param subject subject of schema registry to produce
         * @param schema schema that will be used for serialization
         * @param schemaRegistryUrl URL of schema registry to connect
         * @param registryConfigs map with additional schema registry configs (for example SSL
         *     properties)
         * @return serialized record
         */
        public static SafeConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
                String subject,
                Schema schema,
                String schemaRegistryUrl,
                @Nullable Map<String, ?> registryConfigs) {
            return new SafeConfluentRegistryAvroSerializationSchema<>(
                    GenericRecord.class,
                    schema,
                    new CachedSchemaCoderProvider(
                            subject,
                            schemaRegistryUrl,
                            DEFAULT_IDENTITY_MAP_CAPACITY,
                            registryConfigs));
        }
    }