apache-kafkaterraformapache-kafka-connectaws-mskkraft

MSK connector failed to find class in MSK cluster with KRaft mode


I have a Kafka 3.7.x cluster using ZooKeeper mode which works well. Now I am trying new Kafak in KRaft mode:

enter image description here

However, for this Kafka cluster in KRaft mode, the same connector failed to start.

enter image description here

Here is full Kakfa Connector error log.

Note inside it has

org.apache.kafka.connect.errors.ConnectException: Failed to find any
class that implements Connector and which name matches
com.snowflake.kafka.connector.SnowflakeSinkConnector, available
connectors are: PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSinkConnector,
name='org.apache.kafka.connect.file.FileStreamSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSourceConnector,
name='org.apache.kafka.connect.file.FileStreamSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorCheckpointConnector,
name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector,
name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorSourceConnector,
name='org.apache.kafka.connect.mirror.MirrorSourceConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockConnector,
name='org.apache.kafka.connect.tools.MockConnector', version='2.7.1',
encodedVersion=2.7.1, type=connector, typeName='connector',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSinkConnector,
name='org.apache.kafka.connect.tools.MockSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSourceConnector,
name='org.apache.kafka.connect.tools.MockSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.SchemaSourceConnector,
name='org.apache.kafka.connect.tools.SchemaSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSinkConnector,
name='org.apache.kafka.connect.tools.VerifiableSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSourceConnector,
name='org.apache.kafka.connect.tools.VerifiableSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'} ```

Both Kafka clusters are using same plugin which is a zip file with these jars. You can see snowflake-kafka-connector-2.2.2.jar which has com.snowflake.kafka.connector.SnowflakeSinkConnector inside.

enter image description here

For these two Kafka clusters, the connector and plugin are using same cTerraform code meaning all parameters are same. The only difference is kafka_version in Kafka cluster: 3.7.x and 3.7.x.kraft.

resource "aws_msk_cluster" "hm_amazon_msk_cluster" {
  cluster_name           = var.amazon_msk_cluster_name
  kafka_version          = "3.7.x.kraft" # <- only this line is different, the other one is `3.7.x`
  number_of_broker_nodes = var.kafka_broker_number
  storage_mode           = "TIERED"
  broker_node_group_info {
    instance_type   = "kafka.m7g.large"
    security_groups = [var.amazon_vpc_security_group_id]
    client_subnets  = var.amazon_vpc_subnet_ids
  }
  logging_info {
    broker_logs {
      s3 {
        enabled = true
        bucket  = var.kafka_broker_log_s3_bucket_name
        prefix  = "brokers"
      }
    }
  }
  encryption_info {
    encryption_at_rest_kms_key_arn = var.aws_kms_key_arn
  }
  client_authentication {
    sasl {
      iam = true
    }
  }
}
resource "aws_s3_object" "hm_amazon_s3_object" {
  bucket = var.s3_bucket_name
  key    = var.s3_key
  source = var.local_file_path
  etag   = filemd5(var.local_file_path)
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
  name         = var.amazon_msk_plugin_name
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = var.s3_bucket_arn
      file_key   = var.amazon_msk_plugin_s3_key
    }
  }
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
  name                 = var.amazon_msk_connector_name
  kafkaconnect_version = "2.7.1"
  capacity {
    autoscaling {
      mcu_count        = 1
      min_worker_count = 1
      max_worker_count = 2
      scale_in_policy {
        cpu_utilization_percentage = 40
      }
      scale_out_policy {
        cpu_utilization_percentage = 95
      }
    }
  }
  # https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-properties
  # https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka
  connector_configuration = {
    "connector.class"                  = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
    "tasks.max"                        = 4
    "topics"                           = var.kafka_topic_name
    "buffer.count.records"             = 10000
    "buffer.flush.time"                = 5
    "buffer.size.bytes"                = 20000000
    "snowflake.url.name"               = "xx.snowflakecomputing.com"
    "snowflake.user.name"              = var.snowflake_user_name
    "snowflake.private.key"            = var.snowflake_private_key
    "snowflake.private.key.passphrase" = var.snowflake_private_key_passphrase
    "snowflake.role.name"              = var.snowflake_role_name
    "snowflake.ingestion.method"       = "SNOWPIPE_STREAMING"
    "snowflake.enable.schematization"  = true
    "snowflake.database.name"          = var.snowflake_database_name
    "snowflake.schema.name"            = var.snowflake_schema_name
    "value.converter"                     = "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url" = var.confluent_schema_registry_url
    "errors.log.enable"                   = true
    "errors.tolerance"                    = "all"
    "jmx"                                 = true
  }
  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = var.amazon_msk_cluster_bootstrap_servers
      vpc {
        security_groups = [var.amazon_vpc_security_group_id]
        subnets         = var.amazon_vpc_subnet_ids
      }
    }
  }
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }
  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }
  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.arn
      revision = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.latest_revision
    }
  }
  log_delivery {
    worker_log_delivery {
      s3 {
        bucket  = var.msk_log_s3_bucket_name
        prefix  = var.msk_log_s3_key
        enabled = true
      }
    }
  }
  service_execution_role_arn = var.amazon_msk_connector_iam_role_arn
}

Is there anything else I need pay attention to use Kafka KRaft mode? And could it be a MSK bug? Thanks!


Solution

  • Hmm this is a little awkward. After adding depends_on between each resources, it works well now for Kafka in KRaft mode. Nothing else changed.

    I think because Terraform by default applies concurrently. Maybe when the Kafka connector started, the ZIP in S3 has not been fully uploaded yet.

    resource "aws_s3_object" "hm_amazon_s3_object" {
      # ...
    }
    
    resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
      # ...
      depends_on = [
        aws_s3_object.hm_amazon_s3_object
      ]
    }
    
    resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
      # ...
      depends_on = [
        aws_mskconnect_custom_plugin.hm_amazon_msk_plugin
      ]
    }