amazon-web-servicesapache-kafkaapache-camelazure-eventhubamazon-msk

How To Run Kafka Camel Connectors On Amazon MSK


Context: I followed this link on setting up AWS MSK and testing a producer and consumer and it is setup and working correctly. I am able to send and receive messages via 2 separate EC2 instances that both use the same Kafka cluster (My MSK cluster). Now, I would like to establish a data pipeline all the way from Eventhubs to AWS Firehose which follows the form:

Azure Eventhub -> Eventhub-to-Kafka Camel Connector -> AWS MSK -> Kafka-to-Kinesis-Firehose Camel Connector -> AWS Kinesis Firehose

I was able to successfully do this without the use of MSK (via regular old Kafka) but for unstated reasons need to use MSK now and I can't get it working.

Problem: When trying to start the connectors between AWS MSK and the two Camel connectors I am using, I get the following error:

Bug

These are the two connectors in question:

  1. AWS Kinesis Firehose to Kafka Connector (Kafka -> Consumer)
  2. Azure Eventhubs to Kafka Connector (Producer -> Kafka)

Goal: Get these connectors to work with the MSK, like they did without it, when they were working directly with Kafka.

Here is the issue for Firehose:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

Here is the one for Azure:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)

Solution

  • MSK doesn't offer Kafka Connect as a service. You'll need to install this on your own computer, or on other AWS compute resources. From there, you need to install the Camel connector plugins