apache-kafkaapache-kafka-streamsavroconfluent-schema-registryconfluent-cloud

Kafka Streams Avro Consumer is not able to retrieve Avro schema for id 10007


I am running a Kafka Streams Consumer that is trying to consume Avro records from Confluent Cloud. I keep getting the error: Error retrieving Avro unknown schema for id 100007 and unauthorized; error code: 401. I have a streams.properties file in src/main/resources/.

Here is my error message:

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:341)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:113)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:409)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:330)
    ... 16 more

And here is the Kafka Streams code

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;


public class streams {

    public static void main(String[] args) throws IOException {

        StreamsBuilder builder = new StreamsBuilder();

        Properties properties = new Properties();

        InputStream in = streams.class.getClassLoader().getResourceAsStream("streams.properties");
        properties.load(in);

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-2396y.us-east-1.aws.confluent.cloud:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

        final String inputTopic = properties.getProperty("producer.send.topic");

        final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
            "https://psrc-4xgzx.us-east-2.aws.confluent.cloud");

        final Serde<String> stringSerde = Serdes.String();
        final Serde<POCEntity> valueAvroSerde = new SpecificAvroSerde<>();
        valueAvroSerde.configure(serdeConfig, false);


        KStream<String, POCEntity> firstStream = builder.stream(inputTopic, Consumed.with(stringSerde, valueAvroSerde));
        firstStream.peek((key, value) -> System.out.println("key " +key +"value " + value));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();

    }

}

I don't understand what I am doing wrong. I have followed instructions from https://docs.confluent.io/platform/current/streams/developer-guide/datatypes.html#avro, https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html, https://www.youtube.com/watch?v=LxxeXI1mPKo, https://www.youtube.com/watch?v=DOBMB0L0oKQ&list=PLa7VYi0yPIH35IrbJ7Y0U2YLrR9u4QO-s&index=4, and https://github.com/confluentinc/kafka-streams-examples/tree/7.1.1-post/src/main/java/io/confluent/examples/streams (Looked at the avro examples for guidance).

Completely at a loss here.


Solution

  • As the error says, Unauthorized.

    You have given no authentication settings to your avro serde config.

    Notice from docs - basic.auth.credentials.source + schema.registry.basic.auth.user.info. The rest of the links you've given seem to be "local development / getting started" and don't cover security configurations

    Similarly, you need SASL properties in your properties variable to connect to the actual broker, assuming that is not part of streams.properties file...

    Confluent Cloud requires authentication, and the values for that setting should be shown in your cluster dashboard.

    If there were no authentication, anyone would be able to copy the code in your question and start sending/consuming random data ;)