8

I'm using kafka-python to produce messages for a Kafka 2.2.1 cluster (a managed cluster instance from AWS's MSK service). I'm able to retrieve the bootstrap servers and establish a network connection to them, but no message ever gets through. Instead after each message of the Type A I immediately receive one of type B... and eventually a type C:

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

What causes a broker node to accept a TCP connection from a hopeful producer, but then immediately close it again?

Edit

  • The topic already exists, and kafka-topics.sh --list displays it.

  • I have the same problem with all clients I've used: Kafka's kafka-console-producer.sh, kafka-python, confluent-kafka, and kafkacat

  • The Kafka cluster is in the same VPC as all my other machines, and its security group allows any incoming and outgoing traffic within that VPC.

  • However, it's managed by Amazon's Managed Streaming for Kafka (MSK) servive, which means I don't have fine-grained control over the server installation settings (or even know what they are). MSK just publishes the zookeeper and message broker URLs for clients to use.

  • The producer runs as an AWS Lambda function, but the problem persists when I run it on a normal EC2 instance.

  • Permissions are not the issue. I have assigned the lambda role all the AWS permissions it needs (AWS is always very explicit about which operation required which missing permission).

  • Connectivity is not the issue. I can reach the URLs of both the zookeepers and the message brokers with standard telnet. However, issuing commands to the zookeepers works, while issuing commands to the message brokers always eventually fails. Since Kafka uses a binary protocol over TCP, I'm at a loss how to debug the problem further.

Edit

As suggested, I debugged this with

./kafkacat -b $BROKERS -L -d broker

and got:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN

So, is this a kind of mismatch between client and broker API versions? How can I recover from this, bearing in mind that I have no control over the version or the configuration of the Kafka cluster that AWS provides?

fmcmac
  • 140
  • 1
  • 9
Kilian Foth
  • 13,904
  • 5
  • 39
  • 57
  • 1
    Is your target topic already created? If you could also share your code and configuration that would be helpful. – Giorgos Myrianthous Nov 19 '19 at 16:36
  • @GiorgosMyrianthous No, it's not, but the problem is the same with and without `auto.create.topics.enable`. – Kilian Foth Nov 19 '19 at 17:20
  • Try to create the topic before running your producer. It should work. – Giorgos Myrianthous Nov 19 '19 at 17:21
  • @GiorgosMyrianthous I've now tried pre-populating the topic independently. The problem is the same. – Kilian Foth Nov 20 '19 at 09:09
  • just check "ufw status" , if you get any content in the status , just disable the ufw and then try with the same. – redhatvicky Nov 22 '19 at 06:55
  • To add up did you watch the tcp stream and the producer tried to even talk to Kafka, so that you can confirm that timing out in the cluster side, I got up something in mind just give a wild try can you increase request_timeout_ms and then check. – redhatvicky Nov 22 '19 at 07:00
  • does the EC2 instance is attached to the public ip / static ip and the static ip has been configured in advertised.listeners=PLAINTEXT://external-ip: – redhatvicky Nov 22 '19 at 07:32
  • Let me put it in a different way while creating EC2 Instance, In Configure Instance Details, In the netwrk list when you choose the network There is an option to enable "Auto-assign Public IP list" did you enable for Auto-assign Public IP list. – redhatvicky Nov 22 '19 at 07:55
  • Execute the below commands bin/kafka-topics.sh --describe --zookeeper :2181 --topic remotetopic bin/kafka-console-consumer.sh --zookeeper :2181 --from-beginning --topic remotetopic if we have a problem with the second command we Should change the config/server.properties at server/broker side, advertised.listeners=PLAINTEXT://192.168.150.150:2181 reference : https://stackoverflow.com/questions/33584124/kafka-how-to-connect-kafka-console-consumer-to-fetch-remote-broker-topic-conte – redhatvicky Nov 22 '19 at 11:57
  • Did it solve your issue? – Bsquare ℬℬ Nov 23 '19 at 09:54

2 Answers2

6

I think that this is related to the TLS encryption. By default, MSK spins up a cluster that accepts both PLAINTEXT and TLS but if you are grabbing the bootstrap servers programmatically from the cluster it will only provide you with the TLS ports. If this is the case for you, try using the PLAINTEXT port 9092 instead.

To authenticate the client for TLS you need to generate a certificate: https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html and would then need to get this certificate onto your lambda and reference the certificate in your Producer configuration.

If you are able to configure your MSK cluster as PLAINTEXT only then when you grab the bootstrap servers from the AWS SDK it will give you the PLAINTEXT port and you should be good.

Garrett Hoffman
  • 676
  • 3
  • 4
  • Hmmm. I can connect to the 9094 port, but the 9092 port times out. Also, the get_bootstrap_brokers() call returns BootstrapBrokersTls, but no plain BootstrapBrokers. I suppose that means the Cluster doesn't in fact allow plain text connections. So I'll have to either configure a cert, or get the cluster to communicate without one, right? – Kilian Foth Nov 27 '19 at 07:28
  • Based on this, you may want to do `$ export KAFKA_OPTS="-Djavax.net.debug=all"`, run `kafka-console-producer ...`, and check the trace for any TLS issues. – mazaneicha Nov 27 '19 at 14:07
  • @KilianFoth, yes that's right. Redeploy the cluster as PLAINTEXT only if you don't need the encryption or generate the cert and add that to your lambda deployment package. When we went through this we went with the PLAINTEXT option as the cluster is internal only and inside a VPC. – Garrett Hoffman Nov 27 '19 at 16:17
1

Since it doesn't work for non-python clients either, it's unlikely that it's a bug in the library.

It seems to be a networking issue.

There is a kafka broker setting called advertised.listeners which specifies the address that the client will be using after the first connection. In other words, this is what happens when a client consumes or produces:

  1. Using the bootstrap.servers, it establish the first connection and ask for the real address to use.

  2. The broker answers back with the address specified by advertised.listeners within the brokers configuration.

  3. The client tries consuming or producing using that new address.

This is a security feature that prevents brokers that could be public accessible from being consumed/produced by clients that shouldn't have access.

How to diagnose

Run the following command:

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L

which returns

Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

In this scenario, ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 is the address specified by the client, and even if the client have access to that address/port, ip-172-31-18-160.us-west-2.compute.internal:9092 will be the address that will be used to consume/produce.

Now, if you are running kafka in AWS MSK, it would probably be managing this for you. You have to make sure that you can access the address returned by that command. If you don't, you might need to either change it or run your command from a host that have access to it.

Another option might be to open a ssh tunnel using a bastion host that have access internally to that address.

You can find more detailed info at: https://rmoff.net/2018/08/02/kafka-listeners-explained

JaviOverflow
  • 1,434
  • 2
  • 14
  • 31
  • Well, `./kafkacat -b $BROKERS -L` gives me: "% ERROR: Failed to acquire metadata: Local: Timed out". The trouble is that while I have allowed all network traffic to/from the client host, I don't see how to allow more connections to/from the Kafka message broker hosts, since AWS administrates those for me. – Kilian Foth Nov 25 '19 at 09:28