The following error appears when attempting to use Confluent Platform CLI tools to read messages from Kafka.
[2023-01-17T18:00:14.960189+0100] [2023-01-17 18:00:14,957] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105)
[2023-01-17T18:00:14.960210+0100] org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 119
[2023-01-17T18:00:14.960230+0100] Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 40101
[2023-01-17T18:00:14.960249+0100] at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
[2023-01-17T18:00:14.960272+0100] at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
[2023-01-17T18:00:14.960293+0100] at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:330)
[2023-01-17T18:00:14.960312+0100] at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
[2023-01-17T18:00:14.960332+0100] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
[2023-01-17T18:00:14.960353+0100] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
[2023-01-17T18:00:14.960372+0100] at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
[2023-01-17T18:00:14.960391+0100] at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
[2023-01-17T18:00:14.960412+0100] at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:120)
[2023-01-17T18:00:14.960431+0100] at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:112)
[2023-01-17T18:00:14.960449+0100] at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:137)
[2023-01-17T18:00:14.960468+0100] at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
[2023-01-17T18:00:14.960487+0100] at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
[2023-01-17T18:00:14.960506+0100] at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
I am using Kafka 3.2 (both client and server), with a Karapace schema registry by Aiven. I can query the schema registry manually using curl
by including the credentials in the URL:
(base) me@my-laptop:~$ curl https://$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD@$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
["my-topic-" <redacted>
Or as basic auth in a header:
(base) me@my-laptop:~$ curl -u "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD" https://$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
["my-topic-" <redacted>
The error seems to happen when the credentials are not passed to the schema registry:
(base) me@my-laptop:~$ curl https://$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
{"error_code": 40101, "message": "Unauthorized"}
According to official docs for kafka-avro-console-consumer
, I can use the authentication source URL
or USER_INFO
, and it should pass those credentials to the schema registry. This does not work, and causes the above error.
kafka-avro-console-consumer \
--bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
--consumer.config /home/guido/.tls/kafka/client-tls.properties \
--property schema.registry.url=https://$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD@$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT \
--property basic.auth.credentials.source=URL \
--topic my-topic
I've tried every combination I can think of, with URL, USER_INFO, separate credentials, prefixed with schema.registry
and without, but all lead to the same error. When I use the regular kafka-console-consumer.sh
the same settings work, but I see the Kafka messages as a byte stream, rather than the deserialized Avro message that I'm looking for.
EDIT: it appears that java.net.HttpURLConnection is the problem. It strips credentials from the URL, and the version of schema-registry-client packaged with Confluent Platform does not support any other version of Basic Authentication yet.
import java.net.URL
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
class ExampleTest extends AnyFlatSpec with Matchers {
behavior.of("Example")
it should "work" in {
val url = "https://username:p4ssw0rd@kafka.example.com:12345"
val connection = new URL(url).openConnection()
noException shouldBe thrownBy {
connection.getInputStream
}
}
}
The test fails
Found it. There were three causes for my problem.
confluent-platform-2.11
. This version did not yet support any schema registry authentication, beyond username and password in the URL.sun.net.www.protocol.http.HttpURLConnection
, does not support credentials in the URL. They are stripped before making the request, despite the URL correctly containing the credentials.The correct solution was to upgrade to a later version of Confluent Platform.
See https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html#configure-cp