1

Context: I followed this link on setting up AWS MSK and testing a producer and consumer and it is setup and working correctly. I am able to send and receive messages via 2 separate EC2 instances that both use the same Kafka cluster (My MSK cluster). Now, I would like to establish a data pipeline all the way from Eventhubs to AWS Firehose which follows the form:

Azure Eventhub -> Eventhub-to-Kafka Camel Connector -> AWS MSK -> Kafka-to-Kinesis-Firehose Camel Connector -> AWS Kinesis Firehose

I was able to successfully do this without the use of MSK (via regular old Kafka) but for unstated reasons need to use MSK now and I can't get it working.

Problem: When trying to start the connectors between AWS MSK and the two Camel connectors I am using, I get the following error:

Bug

These are the two connectors in question:

  1. AWS Kinesis Firehose to Kafka Connector (Kafka -> Consumer)
  2. Azure Eventhubs to Kafka Connector (Producer -> Kafka)

Goal: Get these connectors to work with the MSK, like they did without it, when they were working directly with Kafka.

Here is the issue for Firehose:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

Here is the one for Azure:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Mike R
  • 464
  • 5
  • 16

2 Answers2

1

MSK doesn't offer Kafka Connect as a service. You'll need to install this on your own computer, or on other AWS compute resources. From there, you need to install the Camel connector plugins

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thanks. So what I am confused about is that I am using the --bootstrap-server BootstrapBrokerStringTls per step 6 #3 in the docs, running connect-distributed locally, and then trying to run Camel Producer and Consumer Connector Plugins. But I keep getting errors when running the Camel Connectors. Is that the right way to run Camel Connector Plugins using Amazon MSK? Not much documentation to help. – Mike R May 04 '21 at 13:34
  • Connect itself doesn't use `--bootstrap-server`, so I'm not sure what you're referring to there, but putting the MSK address and any relevant certificates in the connect-distributed.properties file is all that's needed. Then update `plugin.path` there as well for the Camel JAR files. Regarding the errors in the question, seems like some version issue, or you're missing some other JAR files, but I've never used Camel, so couldn't say – OneCricketeer May 04 '21 at 13:42
  • So I found this, which will help start Kafka Connect on the MSK. https://amazonmsk-labs.workshop.aws/en/securityencryption/tlsmauth/kafkaconnect.html But then there is the issue of the connectors not working and throwing errors. None of the recommendations work from https://stackoverflow.com/questions/58511736/failed-to-connect-to-and-describe-kafka-cluster-apache-kafka-connect https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-kafka-connector-msk/ – Mike R May 04 '21 at 14:01
  • It's generally recommended not to run Connect on the brokers themselves. In any case, the error you're getting is very Java/plugin specific, and not a Kafka connection issue at all – OneCricketeer May 04 '21 at 14:48
  • Hey, any recommendations for troubleshooting? It looks to be Java specific related to the Kafka Connectors. Would I use another EC2 system then for the connectors? – Mike R May 04 '21 at 14:59
  • The environment isn't going to help. Containers or a physical instance would both allow you to set breakpoints in the code for a remote debugger. But anyways, for Kinesis, it appears to want an IAM key, so EC2 or ECS might help. You might want to find the Github/JIRA or Camel project users list to ask about the problem there – OneCricketeer May 04 '21 at 22:39
1

Kafka Connect is a framework which works with Kafka(MSK, open source or any other kafka distribution). However, it does not come with any connectors. Kafka Connect is bundled along with with open source kafka.

As a best practice never run kafka connect on the same servers as your broker nodes. Because they share binaries. Tuning a broker can cause unintended issues on kafka brokers. Also, Kafka Connect applications are applications and you donot run your kafka consumer or producer applications on the same nodes. So create an EC2 instance(s) and deploy kafka connect there.

Coming to TLS - If you are enabling client side TLS authentication - you need to look for boostrap_broker_tls.

floating_hammer
  • 409
  • 3
  • 10