apache-kafkastrimzi

Unexpected Kafka request of type METADATA during SASL handshake On Linux Machine


I try to create a docker-compose with strimzi-kafka. And my docker-compose looks like:

version: '3.8'

networks:
  dev_net:
    external: true

services:
  zoo1:
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: zoo1.devops
    container_name: zoo1
    ports:
      - "${ZOOKEEPER_PORT_1}:${ZOOKEEPER_PORT_1}"
      - "12888:12888"
      - "13888:13888"
    volumes:
      - ./docker/zookeeper/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/zookeeper1:/var/lib/zookeeper/data
      - ./kafka-data-new/zookeeper-logs1:/var/lib/zookeeper/log
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT_1}
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1.devops:12888:13888;zoo2.devops:22888:23888;
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      LOG_DIR: /tmp/logs

  zoo2:
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: zoo2.devops
    container_name: zoo2
    ports:
      - "${ZOOKEEPER_PORT_2}:${ZOOKEEPER_PORT_2}"
      - "22888:22888"
      - "23888:23888"
    volumes:
      - ./docker/zookeeper/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/zookeeper2:/var/lib/zookeeper/data
      - ./kafka-data-new/zookeeper-logs2:/var/lib/zookeeper/log
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT_2}
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1.devops:12888:13888;zoo2.devops:22888:23888;
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      LOG_DIR: /tmp/logs


  kafka1: &kafka
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: kafka1.devops
    container_name: kafka1
    ports:
      - "19092:19092"
      - "${KAFKA_PLAIN_PORT_1}:${KAFKA_PLAIN_PORT_1}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker1/data:/var/lib/kafka/data
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    depends_on:
      - zoo1
      - zoo2
    environment: &kafka-environment
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zoo1.devops:${ZOOKEEPER_PORT_1}
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka1.devops:9092,EXTERNAL://kafka1.devops:19092,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1}
      KAFKA_LISTENERS: EXTERNAL://kafka1.devops:19092,JWT://kafka1.devops:9092,INTROSPECT://kafka1.devops:9093,JWTPLAIN://kafka1.devops:9094,INTROSPECTPLAIN://kafka1.devops:9095,JWTREFRESH://kafka1.devops:9096,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1},SCRAM://kafka1.devops:9101
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:SASL_PLAINTEXT,JWT:SASL_PLAINTEXT,INTROSPECT:SASL_PLAINTEXT,JWTPLAIN:SASL_PLAINTEXT,INTROSPECTPLAIN:SASL_PLAINTEXT,JWTREFRESH:SASL_PLAINTEXT,PLAIN:SASL_PLAINTEXT,SCRAM:SASL_PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_INTER_BROKER_LISTENER_NAME: JWT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_PRINCIPAL_BUILDER_CLASS: io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder

      # Common settings for all the listeners
      # username extraction from JWT token claim
      OAUTH_USERNAME_CLAIM: preferred_username
      OAUTH_CONNECT_TIMEOUT_SECONDS: 20

      # Configuration of individual listeners
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.groups.claim=\"$$.realm_access.roles\" ;
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.groups.claim=\"$$.realm_access.roles\" ;
      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler


      KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.introspection.endpoint.uri=\"${KEYCLOAK_INTROSPECT_URI}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

      KAFKA_LISTENER_NAME_JWTPLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler

      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    oauth.introspection.endpoint.uri=\"${KEYCLOAK_INTROSPECT_URI}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler

      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.jwks.refresh.min.pause.seconds=\"2\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      # Enable re-authentication
      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_CONNECTIONS_MAX_REAUTH_MS: 45000

      KAFKA_LISTENER_NAME_PLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_PLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    username=\"${KAFKA_ADMIN_USERNAME}\"    password=\"${KAFKA_ADMIN_PASSWORD}\"    user_admin=\"${KAFKA_ADMIN_PASSWORD}\"    user_monitor=\"${KAFKA_MONITOR_PASSWORD}\" ;

      KAFKA_LISTENER_NAME_SCRAM_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
      KAFKA_LISTENER_NAME_SCRAM_SCRAM__2DSHA__2D512_SASL_JAAS_CONFIG: org.apache.kafka.common.security.scram.ScramLoginModule required    username=\"${KAFKA_ADMIN_USERNAME}\"    password=\"${KAFKA_ADMIN_PASSWORD}\" ;

      # Authorizer configuration
      KAFKA_AUTHORIZER_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer

      KAFKA_STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI: ${KEYCLOAK_TOKEN_URI}
      KAFKA_STRIMZI_AUTHORIZATION_CLIENT_ID: ${KEYCLOAK_KAFKA_CLIENT}
      KAFKA_STRIMZI_AUTHORIZATION_CLIENT_SECRET: ${KEYCLOAK_KAFKA_SECRET}
      KAFKA_STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME: ${KAFKA_CLUSTER}
      KAFKA_STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL: true
      KAFKA_STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS: 45

      # Parameters controlling the refreshing of grants
      KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_POOL_SIZE: 4

      # Any change to permissions will be reflected within 10 seconds
      # Has to be set to 10 seconds for keycloak-authz*-tests/**/RefreshTest
      KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS: 45

      # If a grants fetch fails, immediately perform one retry
      KAFKA_STRIMZI_AUTHORIZATION_HTTP_RETRIES: 1

      # Use grants fetched for another session if available
      KAFKA_STRIMZI_AUTHORIZATION_REUSE_GRANTS: true

      KAFKA_STRIMZI_AUTHORIZATION_ENABLE_METRICS: true

      KAFKA_SUPER_USERS: User:${KAFKA_ADMIN_USERNAME};User:${KAFKA_SERVICE_ACCOUNT}

      # OAuth metrics configuration

      OAUTH_ENABLE_METRICS: true
      #   When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX
      #   The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS
      STRIMZI_OAUTH_METRIC_REPORTERS: org.apache.kafka.common.metrics.JmxReporter

      #   The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file
      #   However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics
      #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter


      # Other configuration
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_SESSION_TIMEOUT_MS: 400000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 450000
      KAFKA_REQUEST_TIMEOUT_MS: 450000
      KAFKA_MAX_REQUEST_SIZE: 100000000
      KAFKA_MESSAGE_MAX_BYTES: 100000000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 100000000

      # For start.sh script to know where the keycloak is listening
      KEYCLOAK_URI: ${KEYCLOAK_URI}
      KEYCLOAK_HOST: ${KEYCLOAK_HOST}
      REALM: ${KEYCLOAK_REALM}
      KAFKA_ADMIN_USERNAME: ${KAFKA_ADMIN_USERNAME}
      KAFKA_ADMIN_PASSWORD: ${KAFKA_ADMIN_PASSWORD}


  kafka2:
    <<: *kafka
    hostname: kafka2.devops
    container_name: kafka2
    ports:
      - "29092:29092}"
      - "${KAFKA_PLAIN_PORT_2}:${KAFKA_PLAIN_PORT_2}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker2/data:/var/lib/kafka/data
    environment:
      <<: *kafka-environment
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka2.devops:9092,EXTERNAL://kafka2.devops:29092,PLAIN://kafka2.devops:${KAFKA_PLAIN_PORT_2}
      KAFKA_LISTENERS: EXTERNAL://kafka2.devops:29092,JWT://kafka2.devops:9092,INTROSPECT://kafka2.devops:9093,JWTPLAIN://kafka2.devops:9094,INTROSPECTPLAIN://kafka2.devops:9095,JWTREFRESH://kafka2.devops:9096,PLAIN://kafka2.devops:${KAFKA_PLAIN_PORT_2},SCRAM://kafka2.devops:9101


  kafka3:
    <<: *kafka
    hostname: kafka3.devops
    container_name: kafka3
    ports:
      - "39092:39092"
      - "${KAFKA_PLAIN_PORT_3}:${KAFKA_PLAIN_PORT_3}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker3/data:/var/lib/kafka/data
    environment:
      <<: *kafka-environment
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka3.devops:9092,EXTERNAL://kafka3.devops:39092,PLAIN://kafka3.devops:${KAFKA_PLAIN_PORT_3}
      KAFKA_LISTENERS: EXTERNAL://kafka3.devops:39092,JWT://kafka3.devops:9092,INTROSPECT://kafka3.devops:9093,JWTPLAIN://kafka3.devops:9094,INTROSPECTPLAIN://kafka3.devops:9095,JWTREFRESH://kafka3.devops:9096,PLAIN://kafka3.devops:${KAFKA_PLAIN_PORT_3},SCRAM://kafka3.devops:9101


  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    restart: always
    ports:
      - "${CMAK_PORT}:8080"
    networks:
      - dev_net
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      AUTH_TYPE: "LOGIN_FORM"
      SPRING_SECURITY_USER_NAME: ${CMAK_USERNAME}
      SPRING_SECURITY_USER_PASSWORD: ${CMAK_PASSWORD}
      KAFKA_CLUSTERS_0_NAME: ${KAFKA_CLUSTER}
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1.devops:${KAFKA_PLAIN_PORT_1},kafka2.devops:${KAFKA_PLAIN_PORT_2},kafka3.devops:${KAFKA_PLAIN_PORT_3}
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_ADMIN_USERNAME}\" password=\"${KAFKA_ADMIN_PASSWORD}\";"

it works very well on windows operation system but when I try to run it on linux, I got errors like: <data-plane-kafka-network-thread-2-ListenerName(EXTERNAL)-SASL_PLAINTEXT-2> [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /INTERNAL_IP (channelId=INTERNAL_IP :29092-INTERNAL_IP :56818-15) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) and brokers are not available.

Is there any specific implementation for linux machine? Any suggestion?


Solution

  • Solved with two changes:

    First one is, disabled network:

    #networks:
    #  - dev_net
    

    Second one is, specified external ip address on advertised.listeners:

    KAFKA_ADVERTISED_LISTENERS: JWT://kafka1.devops:9092,EXTERNAL://${KAFKA_EXTERNAL_URL}:19092,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1}