When I use in Apache Flink the KafkaRecordSerializationSchema with settings for the schema registry serialization , the registryConfigs settings are not taken in account
settings like
auto.register.schemas or avro.remove.java.properties
package flink;
import example.avro.Car;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS;
import static io.confluent.kafka.serializers.KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG;
public class Jobflink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.AT_LEAST_ONCE);
Map<String, String> schema_settings = Map.of(
AVRO_REMOVE_JAVA_PROPS_CONFIG, "true",
AUTO_REGISTER_SCHEMAS, "false"
);
KafkaSink<Car> sink = KafkaSink.<Car>builder()
.setBootstrapServers("kafka-local:29092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(
ConfluentRegistryAvroSerializationSchema.forSpecific(
Car.class,
"toto-value",
"http://confluent-schema-registry-local:8081",
schema_settings))
.setTopic("toto")
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
env.execute();
}
}
I also tried to set the confluent schema registry settings with setKafkaProducerConfig but it also do not work
Properties properties = new Properties();
properties.setProperty(AVRO_REMOVE_JAVA_PROPS_CONFIG, "true");
properties.setProperty(AUTO_REGISTER_SCHEMAS, "false");
KafkaSink<Car> sink = KafkaSink.<Car>builder()
.setBootstrapServers("kafka-local:29092")
.setKafkaProducerConfig(properties)
...
feature is not yet available , only SSL settings are working for the moment in registryConfigs
https://issues.apache.org/jira/browse/FLINK-33045
this is a working patch -> SafeConfluentRegistryAvroSerializationSchema
/*
copy past of ConfluentRegistryAvroSerializationSchema
but disable register of schema and call removeProperty(node, "avro.java.string");
*/
package flink;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
import org.apache.flink.formats.avro.SchemaCoder;
import javax.annotation.Nullable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import static java.lang.String.format;
class ConfluentSchemaRegistryCoder implements SchemaCoder {
private final SchemaRegistryClient schemaRegistryClient;
private String subject;
private static final int CONFLUENT_MAGIC_BYTE = 0;
/**
* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
* schema registry.
*
* @param schemaRegistryClient client to connect schema registry
* @param subject subject of schema registry to produce
*/
public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
this.subject = subject;
}
/**
* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
* schema registry.
*
* @param schemaRegistryClient client to connect schema registry
*/
public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
}
@Override
public Schema readSchema(InputStream in) throws IOException {
DataInputStream dataInputStream = new DataInputStream(in);
if (dataInputStream.readByte() != 0) {
throw new IOException("Unknown data format. Magic number does not match");
} else {
int schemaId = dataInputStream.readInt();
try {
return schemaRegistryClient.getById(schemaId);
} catch (RestClientException e) {
throw new IOException(
format("Could not find schema with id %s in registry", schemaId), e);
}
}
}
private static void removeProperty(JsonNode node, String propertyName) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
objectNode.remove(propertyName);
Iterator<JsonNode> elements = objectNode.elements();
while (elements.hasNext()) {
removeProperty(elements.next(), propertyName);
}
} else if (node.isArray()) {
ArrayNode arrayNode = (ArrayNode) node;
Iterator<JsonNode> elements = arrayNode.elements();
while (elements.hasNext()) {
removeProperty(elements.next(), propertyName);
}
}
}
@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
try {
ObjectMapper jsonMapper = JacksonMapper.INSTANCE;
JsonNode node = jsonMapper.readTree(schema.toString());
removeProperty(node, "avro.java.string");
AvroSchema avroSchema = new AvroSchema(node.toString());
schema = avroSchema.rawSchema();
int registeredId = schemaRegistryClient.getId(subject, schema);
out.write(CONFLUENT_MAGIC_BYTE);
byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
out.write(schemaIdBytes);
} catch (RestClientException e) {
throw new IOException("Could not register schema in registry", e);
}
}
}
class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
private static final long serialVersionUID = 8610401613495438381L;
private final String subject;
private final String url;
private final int identityMapCapacity;
private final @Nullable Map<String, ?> registryConfigs;
CachedSchemaCoderProvider(String url, int identityMapCapacity) {
this(null, url, identityMapCapacity, null);
}
CachedSchemaCoderProvider(
@Nullable String subject,
String url,
int identityMapCapacity,
@Nullable Map<String, ?> registryConfigs) {
this.subject = subject;
this.url = Objects.requireNonNull(url);
this.identityMapCapacity = identityMapCapacity;
this.registryConfigs = registryConfigs;
}
@Override
public SchemaCoder get() {
return new ConfluentSchemaRegistryCoder(
this.subject,
new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CachedSchemaCoderProvider that = (CachedSchemaCoderProvider) o;
return identityMapCapacity == that.identityMapCapacity
&& Objects.equals(subject, that.subject)
&& url.equals(that.url)
&& Objects.equals(registryConfigs, that.registryConfigs);
}
@Override
public int hashCode() {
return Objects.hash(subject, url, identityMapCapacity, registryConfigs);
}
}
/**
* Serialization schema that serializes to Avro binary format that uses Confluent Schema Registry.
*
* @param <T> the type to be serialized
*/
public class SafeConfluentRegistryAvroSerializationSchema<T>
extends RegistryAvroSerializationSchema<T> {
private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
private static final long serialVersionUID = -1771641202177852775L;
/**
* Creates a Avro serialization schema.
*
* @param recordClazz class to serialize. Should be either {@link SpecificRecord} or {@link
* GenericRecord}.
* @param schema writer's Avro schema. Should be provided if recordClazz is {@link
* GenericRecord}
* @param schemaCoderProvider provider for schema coder that writes the writer schema to
* Confluent Schema Registry
*/
private SafeConfluentRegistryAvroSerializationSchema(
Class<T> recordClazz,
Schema schema,
SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
super(recordClazz, schema, schemaCoderProvider);
}
/**
* Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
* Avro schema and writes the writer schema to Confluent Schema Registry.
*
* @param tClass the type to be serialized
* @param subject subject of schema registry to produce
* @param schemaRegistryUrl URL of schema registry to connect
* @return serialized record
*/
public static <T extends SpecificRecord>
SafeConfluentRegistryAvroSerializationSchema<T> forSpecific(
Class<T> tClass, String subject, String schemaRegistryUrl) {
return forSpecific(tClass, subject, schemaRegistryUrl, null);
}
/**
* Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
* Avro schema and writes the writer schema to Confluent Schema Registry.
*
* @param tClass the type to be serialized
* @param subject subject of schema registry to produce
* @param schemaRegistryUrl URL of schema registry to connect
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return serialized record
*/
public static <T extends SpecificRecord>
SafeConfluentRegistryAvroSerializationSchema<T> forSpecific(
Class<T> tClass,
String subject,
String schemaRegistryUrl,
@Nullable Map<String, ?> registryConfigs) {
return new SafeConfluentRegistryAvroSerializationSchema<>(
tClass,
null,
new CachedSchemaCoderProvider(
subject,
schemaRegistryUrl,
DEFAULT_IDENTITY_MAP_CAPACITY,
registryConfigs));
}
/**
* Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
* Avro schema and writes the writer schema to Confluent Schema Registry.
*
* @param subject subject of schema registry to produce
* @param schema schema that will be used for serialization
* @param schemaRegistryUrl URL of schema registry to connect
* @return serialized record
*/
public static SafeConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
String subject, Schema schema, String schemaRegistryUrl) {
return forGeneric(subject, schema, schemaRegistryUrl, null);
}
/**
* Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from
* Avro schema and writes the writer schema to Confluent Schema Registry.
*
* @param subject subject of schema registry to produce
* @param schema schema that will be used for serialization
* @param schemaRegistryUrl URL of schema registry to connect
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return serialized record
*/
public static SafeConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
String subject,
Schema schema,
String schemaRegistryUrl,
@Nullable Map<String, ?> registryConfigs) {
return new SafeConfluentRegistryAvroSerializationSchema<>(
GenericRecord.class,
schema,
new CachedSchemaCoderProvider(
subject,
schemaRegistryUrl,
DEFAULT_IDENTITY_MAP_CAPACITY,
registryConfigs));
}
}