apache-kafkaconfluent-schema-registry

Error 40101 when retrieving Avro schema in kafka-avro-console-consumer


The following error appears when attempting to use Confluent Platform CLI tools to read messages from Kafka.

[2023-01-17T18:00:14.960189+0100] [2023-01-17 18:00:14,957] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
[2023-01-17T18:00:14.960210+0100] org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 119
[2023-01-17T18:00:14.960230+0100] Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 40101
[2023-01-17T18:00:14.960249+0100]   at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
[2023-01-17T18:00:14.960272+0100]   at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
[2023-01-17T18:00:14.960293+0100]   at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:330)
[2023-01-17T18:00:14.960312+0100]   at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
[2023-01-17T18:00:14.960332+0100]   at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
[2023-01-17T18:00:14.960353+0100]   at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
[2023-01-17T18:00:14.960372+0100]   at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
[2023-01-17T18:00:14.960391+0100]   at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
[2023-01-17T18:00:14.960412+0100]   at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:120)
[2023-01-17T18:00:14.960431+0100]   at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:112)
[2023-01-17T18:00:14.960449+0100]   at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:137)
[2023-01-17T18:00:14.960468+0100]   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
[2023-01-17T18:00:14.960487+0100]   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
[2023-01-17T18:00:14.960506+0100]   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

I am using Kafka 3.2 (both client and server), with a Karapace schema registry by Aiven. I can query the schema registry manually using curl by including the credentials in the URL:

(base) me@my-laptop:~$ curl https://$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD@$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
["my-topic-" <redacted>

Or as basic auth in a header:

(base) me@my-laptop:~$ curl -u "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD" https://$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
["my-topic-" <redacted>

The error seems to happen when the credentials are not passed to the schema registry:

(base) me@my-laptop:~$ curl https://$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT/subjects
{"error_code": 40101, "message": "Unauthorized"}

According to official docs for kafka-avro-console-consumer, I can use the authentication source URL or USER_INFO, and it should pass those credentials to the schema registry. This does not work, and causes the above error.

kafka-avro-console-consumer \
          --bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
          --consumer.config /home/guido/.tls/kafka/client-tls.properties \
          --property schema.registry.url=https://$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD@$SCHEMA_REGISTRY_HOST:$SCHEMA_REGISTRY_PORT \
          --property basic.auth.credentials.source=URL \
          --topic my-topic

I've tried every combination I can think of, with URL, USER_INFO, separate credentials, prefixed with schema.registry and without, but all lead to the same error. When I use the regular kafka-console-consumer.sh the same settings work, but I see the Kafka messages as a byte stream, rather than the deserialized Avro message that I'm looking for.

EDIT: it appears that java.net.HttpURLConnection is the problem. It strips credentials from the URL, and the version of schema-registry-client packaged with Confluent Platform does not support any other version of Basic Authentication yet.

import java.net.URL
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ExampleTest extends AnyFlatSpec with Matchers {
  
  behavior.of("Example")

  it should "work" in {
    val url = "https://username:p4ssw0rd@kafka.example.com:12345"

    val connection = new URL(url).openConnection()

    noException shouldBe thrownBy {
      connection.getInputStream
    }
  }
}

The test fails


Solution

  • Found it. There were three causes for my problem.

    1. I had an old version of Confluent Platform installed, namely confluent-platform-2.11. This version did not yet support any schema registry authentication, beyond username and password in the URL.
    2. I thought I had the latest version already (3.3.x) but that's actually the latest version of Kafka, not the latest version of Confluent Platform.
    3. Java's default web request implementation, sun.net.www.protocol.http.HttpURLConnection, does not support credentials in the URL. They are stripped before making the request, despite the URL correctly containing the credentials.

    The correct solution was to upgrade to a later version of Confluent Platform.

    See https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html#configure-cp