I have a Kafka 3.7.x cluster using ZooKeeper mode which works well. Now I am trying new Kafak in KRaft mode:
However, for this Kafka cluster in KRaft mode, the same connector failed to start.
Here is full Kakfa Connector error log.
Note inside it has
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.snowflake.kafka.connector.SnowflakeSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.7.1', encodedVersion=2.7.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'} ```
Both Kafka clusters are using same plugin which is a zip file with these jars. You can see snowflake-kafka-connector-2.2.2.jar which has com.snowflake.kafka.connector.SnowflakeSinkConnector
inside.
For these two Kafka clusters, the connector and plugin are using same cTerraform code meaning all parameters are same.
The only difference is kafka_version
in Kafka cluster: 3.7.x
and 3.7.x.kraft
.
resource "aws_msk_cluster" "hm_amazon_msk_cluster" {
cluster_name = var.amazon_msk_cluster_name
kafka_version = "3.7.x.kraft" # <- only this line is different, the other one is `3.7.x`
number_of_broker_nodes = var.kafka_broker_number
storage_mode = "TIERED"
broker_node_group_info {
instance_type = "kafka.m7g.large"
security_groups = [var.amazon_vpc_security_group_id]
client_subnets = var.amazon_vpc_subnet_ids
}
logging_info {
broker_logs {
s3 {
enabled = true
bucket = var.kafka_broker_log_s3_bucket_name
prefix = "brokers"
}
}
}
encryption_info {
encryption_at_rest_kms_key_arn = var.aws_kms_key_arn
}
client_authentication {
sasl {
iam = true
}
}
}
resource "aws_s3_object" "hm_amazon_s3_object" {
bucket = var.s3_bucket_name
key = var.s3_key
source = var.local_file_path
etag = filemd5(var.local_file_path)
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
name = var.amazon_msk_plugin_name
content_type = "ZIP"
location {
s3 {
bucket_arn = var.s3_bucket_arn
file_key = var.amazon_msk_plugin_s3_key
}
}
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
name = var.amazon_msk_connector_name
kafkaconnect_version = "2.7.1"
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 40
}
scale_out_policy {
cpu_utilization_percentage = 95
}
}
}
# https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-properties
# https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka
connector_configuration = {
"connector.class" = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
"tasks.max" = 4
"topics" = var.kafka_topic_name
"buffer.count.records" = 10000
"buffer.flush.time" = 5
"buffer.size.bytes" = 20000000
"snowflake.url.name" = "xx.snowflakecomputing.com"
"snowflake.user.name" = var.snowflake_user_name
"snowflake.private.key" = var.snowflake_private_key
"snowflake.private.key.passphrase" = var.snowflake_private_key_passphrase
"snowflake.role.name" = var.snowflake_role_name
"snowflake.ingestion.method" = "SNOWPIPE_STREAMING"
"snowflake.enable.schematization" = true
"snowflake.database.name" = var.snowflake_database_name
"snowflake.schema.name" = var.snowflake_schema_name
"value.converter" = "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" = var.confluent_schema_registry_url
"errors.log.enable" = true
"errors.tolerance" = "all"
"jmx" = true
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.amazon_msk_cluster_bootstrap_servers
vpc {
security_groups = [var.amazon_vpc_security_group_id]
subnets = var.amazon_vpc_subnet_ids
}
}
}
kafka_cluster_client_authentication {
authentication_type = "IAM"
}
kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}
plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.arn
revision = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.latest_revision
}
}
log_delivery {
worker_log_delivery {
s3 {
bucket = var.msk_log_s3_bucket_name
prefix = var.msk_log_s3_key
enabled = true
}
}
}
service_execution_role_arn = var.amazon_msk_connector_iam_role_arn
}
Is there anything else I need pay attention to use Kafka KRaft mode? And could it be a MSK bug? Thanks!
Hmm this is a little awkward. After adding depends_on
between each resources, it works well now for Kafka in KRaft mode. Nothing else changed.
I think because Terraform by default applies concurrently. Maybe when the Kafka connector started, the ZIP in S3 has not been fully uploaded yet.
resource "aws_s3_object" "hm_amazon_s3_object" {
# ...
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
# ...
depends_on = [
aws_s3_object.hm_amazon_s3_object
]
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
# ...
depends_on = [
aws_mskconnect_custom_plugin.hm_amazon_msk_plugin
]
}