I am running a Kafka Streams Consumer that is trying to consume Avro records from Confluent Cloud. I keep getting the error: Error retrieving Avro unknown schema for id 100007
and unauthorized; error code: 401
. I have a streams.properties
file in src/main/resources/
.
Here is my error message:
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:341)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:113)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:409)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:330)
... 16 more
And here is the Kafka Streams code
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
public class streams {
public static void main(String[] args) throws IOException {
StreamsBuilder builder = new StreamsBuilder();
Properties properties = new Properties();
InputStream in = streams.class.getClassLoader().getResourceAsStream("streams.properties");
properties.load(in);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-2396y.us-east-1.aws.confluent.cloud:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
final String inputTopic = properties.getProperty("producer.send.topic");
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"https://psrc-4xgzx.us-east-2.aws.confluent.cloud");
final Serde<String> stringSerde = Serdes.String();
final Serde<POCEntity> valueAvroSerde = new SpecificAvroSerde<>();
valueAvroSerde.configure(serdeConfig, false);
KStream<String, POCEntity> firstStream = builder.stream(inputTopic, Consumed.with(stringSerde, valueAvroSerde));
firstStream.peek((key, value) -> System.out.println("key " +key +"value " + value));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start();
}
}
I don't understand what I am doing wrong. I have followed instructions from https://docs.confluent.io/platform/current/streams/developer-guide/datatypes.html#avro, https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html, https://www.youtube.com/watch?v=LxxeXI1mPKo, https://www.youtube.com/watch?v=DOBMB0L0oKQ&list=PLa7VYi0yPIH35IrbJ7Y0U2YLrR9u4QO-s&index=4, and https://github.com/confluentinc/kafka-streams-examples/tree/7.1.1-post/src/main/java/io/confluent/examples/streams (Looked at the avro examples for guidance).
Completely at a loss here.
As the error says, Unauthorized
.
You have given no authentication settings to your avro serde config.
Notice from docs - basic.auth.credentials.source
+ schema.registry.basic.auth.user.info
. The rest of the links you've given seem to be "local development / getting started" and don't cover security configurations
Similarly, you need SASL properties in your properties
variable to connect to the actual broker, assuming that is not part of streams.properties
file...
Confluent Cloud requires authentication, and the values for that setting should be shown in your cluster dashboard.
If there were no authentication, anyone would be able to copy the code in your question and start sending/consuming random data ;)